You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ashish Pokharel <as...@yahoo.com> on 2020/05/05 01:41:08 UTC

Restore from savepoint with Iterations

Hi all,

Hope everyone is doing well!

I am running into what seems like a deadlock (application stalled) situation with a Flink streaming job upon restore from savepoint. Job has a slowly moving stream (S1) that needs to be “stateful” and a continuous stream (S2) which is “joined” with slow moving stream (S1). Some level of loss/repetition is acceptable in continuous stream (S2) and hence can rely on something like Kafka consumer states upon restarts etc. Continuous stream (S2) however needs to be iterated through states from slowly moving streams (S1) a few times (mostly 2). States are fair sized (ends up being 15GB on HDFS). When job is restarted with no continuous data (S2) on topic job starts up, restores states and does it’s initial checkpoint within 3 minutes. However, when app is started from savepoint and continuous stream (S2) is actually present in Kafka it seems like application comes to a halt. Looking at progress of checkpoints, it seems like every attempt is stuck after until some timeouts happen at around 10 mins. If iteration on stream is removed app can successfully start and checkpoint even when continuous stream (S2) is flowing in as well. Unfortunately we are working on a hosted environment for both data and platform, hence debugging with thread dumps etc will be challenging. 

I couldn’t find a known issue on this but was wondering if anyone has seen such behavior or know of any issues in past. It does look like checkpointing has to be set to forced to get an iterative job to checkpoint in the first place (an option that is marked deprecated already - working on 1.8.2 version as of now). I do understand challenges around consistent checkpointing of iterative stream. As I mentioned earlier, what I really want to maintain for the most part are states of slowly moving dimensions. Iterations does solve the problem at hand (multiple loops of logic) pretty gracefully but not being able to restore from savepoint will be a show stopper. 

Will appreciate any pointer / suggestions.

Thanks in advance, 

Ashish

Re: Restore from savepoint with Iterations

Posted by ashish pok <as...@yahoo.com>.
Let me see if I can do artificial throttle somewhere. Volume of data is really high and hence trying to avoid rounds in Kafka too. Looks like options are “not so elegant” until FLIP-15. Thanks for pointers again!!!


On Monday, May 4, 2020, 11:06 PM, Ken Krugler <kk...@transpac.com> wrote:

Hi Ashish,
The workaround we did was to throttle data flowing in the iteration (in code), though not sure if that’s possible for your situation.
You could remove the iteration by writing to a Kafka topic at the end of the part of your workflow that is currently an iteration, and then consuming from that same topic as your “iteration" source.
— Ken


On May 4, 2020, at 7:32 PM, Ashish Pokharel <as...@yahoo.com> wrote:

Hi Ken,
Thanks for the quick response!
I came across FLIP-15 on my next google search after I sent email :) It DEFINITELY looks that way. As I was watching logs and nature of how job gets stuck it does look like buffer is blocked. But FLIP-15 has not moved further though. So there are no workarounds at all at this point? Perhaps a technique to block Kafka Consumer for some time? Even that may get me going but looks like there is probability of this happening during the normal processing as your use case demonstrates. I am using iteration with no timeouts for prod job, using timeouts only in unit testing.Theory was in prod input stream will be indefinite and sometime long lull of no event might happen during maintenance, backlog etc. I really would like to avoid a bloat in the DAG by repeating same functions with filters and side outputs. Other than obvious repetition, it will increase the site of states by a factor. Even those slowly moving dimensions are not light (around half billion every day) :) 


On May 4, 2020, at 10:13 PM, Ken Krugler <kk...@transpac.com> wrote:

Hi Ashish,
Wondering if you’re running into the gridlock problem I mention on slide #25 here: https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink
If the iteration path has too much data in it, then the network buffer at the head of the iteration can fill up, and it never clears out because the operator consuming those buffers is blocked writing to the next operator in the iteration, and so on back to the head.
We ran into this when outlinks from web pages caused fan-out/amplification of the data being iterated, but maybe you hit it with restoring from state.
— Ken


On May 4, 2020, at 6:41 PM, Ashish Pokharel <as...@yahoo.com> wrote:
Hi all,

Hope everyone is doing well!

I am running into what seems like a deadlock (application stalled) situation with a Flink streaming job upon restore from savepoint. Job has a slowly moving stream (S1) that needs to be “stateful” and a continuous stream (S2) which is “joined” with slow moving stream (S1). Some level of loss/repetition is acceptable in continuous stream (S2) and hence can rely on something like Kafka consumer states upon restarts etc. Continuous stream (S2) however needs to be iterated through states from slowly moving streams (S1) a few times (mostly 2). States are fair sized (ends up being 15GB on HDFS). When job is restarted with no continuous data (S2) on topic job starts up, restores states and does it’s initial checkpoint within 3 minutes. However, when app is started from savepoint and continuous stream (S2) is actually present in Kafka it seems like application comes to a halt. Looking at progress of checkpoints, it seems like every attempt is stuck after until some timeouts happen at around 10 mins. If iteration on stream is removed app can successfully start and checkpoint even when continuous stream (S2) is flowing in as well. Unfortunately we are working on a hosted environment for both data and platform, hence debugging with thread dumps etc will be challenging. 

I couldn’t find a known issue on this but was wondering if anyone has seen such behavior or know of any issues in past. It does look like checkpointing has to be set to forced to get an iterative job to checkpoint in the first place (an option that is marked deprecated already - working on 1.8.2 version as of now). I do understand challenges around consistent checkpointing of iterative stream. As I mentioned earlier, what I really want to maintain for the most part are states of slowly moving dimensions. Iterations does solve the problem at hand (multiple loops of logic) pretty gracefully but not being able to restore from savepoint will be a show stopper. 

Will appreciate any pointer / suggestions.

Thanks in advance, 

Ashish

--------------------------Ken Kruglerhttp://www.scaleunlimited.comcustom big data solutions & trainingHadoop, Cascading, Cassandra & Solr


--------------------------Ken Kruglerhttp://www.scaleunlimited.comcustom big data solutions & trainingHadoop, Cascading, Cassandra & Solr




Re: Restore from savepoint with Iterations

Posted by Ken Krugler <kk...@transpac.com>.
Hi Ashish,

The workaround we did was to throttle data flowing in the iteration (in code), though not sure if that’s possible for your situation.

You could remove the iteration by writing to a Kafka topic at the end of the part of your workflow that is currently an iteration, and then consuming from that same topic as your “iteration" source.

— Ken


> On May 4, 2020, at 7:32 PM, Ashish Pokharel <as...@yahoo.com> wrote:
> 
> Hi Ken,
> 
> Thanks for the quick response!
> 
> I came across FLIP-15 on my next google search after I sent email :) It DEFINITELY looks that way. As I was watching logs and nature of how job gets stuck it does look like buffer is blocked. But FLIP-15 has not moved further though. So there are no workarounds at all at this point? Perhaps a technique to block Kafka Consumer for some time? Even that may get me going but looks like there is probability of this happening during the normal processing as your use case demonstrates. I am using iteration with no timeouts for prod job, using timeouts only in unit testing.Theory was in prod input stream will be indefinite and sometime long lull of no event might happen during maintenance, backlog etc. I really would like to avoid a bloat in the DAG by repeating same functions with filters and side outputs. Other than obvious repetition, it will increase the site of states by a factor. Even those slowly moving dimensions are not light (around half billion every day) :) 
> 
>> On May 4, 2020, at 10:13 PM, Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
>> 
>> Hi Ashish,
>> 
>> Wondering if you’re running into the gridlock problem I mention on slide #25 here: https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink <https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink>
>> 
>> If the iteration path has too much data in it, then the network buffer at the head of the iteration can fill up, and it never clears out because the operator consuming those buffers is blocked writing to the next operator in the iteration, and so on back to the head.
>> 
>> We ran into this when outlinks from web pages caused fan-out/amplification of the data being iterated, but maybe you hit it with restoring from state.
>> 
>> — Ken
>> 
>> 
>>> On May 4, 2020, at 6:41 PM, Ashish Pokharel <ashishpok@yahoo.com <ma...@yahoo.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> Hope everyone is doing well!
>>> 
>>> I am running into what seems like a deadlock (application stalled) situation with a Flink streaming job upon restore from savepoint. Job has a slowly moving stream (S1) that needs to be “stateful” and a continuous stream (S2) which is “joined” with slow moving stream (S1). Some level of loss/repetition is acceptable in continuous stream (S2) and hence can rely on something like Kafka consumer states upon restarts etc. Continuous stream (S2) however needs to be iterated through states from slowly moving streams (S1) a few times (mostly 2). States are fair sized (ends up being 15GB on HDFS). When job is restarted with no continuous data (S2) on topic job starts up, restores states and does it’s initial checkpoint within 3 minutes. However, when app is started from savepoint and continuous stream (S2) is actually present in Kafka it seems like application comes to a halt. Looking at progress of checkpoints, it seems like every attempt is stuck after until some timeouts happen at around 10 mins. If iteration on stream is removed app can successfully start and checkpoint even when continuous stream (S2) is flowing in as well. Unfortunately we are working on a hosted environment for both data and platform, hence debugging with thread dumps etc will be challenging. 
>>> 
>>> I couldn’t find a known issue on this but was wondering if anyone has seen such behavior or know of any issues in past. It does look like checkpointing has to be set to forced to get an iterative job to checkpoint in the first place (an option that is marked deprecated already - working on 1.8.2 version as of now). I do understand challenges around consistent checkpointing of iterative stream. As I mentioned earlier, what I really want to maintain for the most part are states of slowly moving dimensions. Iterations does solve the problem at hand (multiple loops of logic) pretty gracefully but not being able to restore from savepoint will be a show stopper. 
>>> 
>>> Will appreciate any pointer / suggestions.
>>> 
>>> Thanks in advance, 
>>> 
>>> Ashish
>> 
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>> 
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: Restore from savepoint with Iterations

Posted by Ashish Pokharel <as...@yahoo.com>.
Hi Ken,

Thanks for the quick response!

I came across FLIP-15 on my next google search after I sent email :) It DEFINITELY looks that way. As I was watching logs and nature of how job gets stuck it does look like buffer is blocked. But FLIP-15 has not moved further though. So there are no workarounds at all at this point? Perhaps a technique to block Kafka Consumer for some time? Even that may get me going but looks like there is probability of this happening during the normal processing as your use case demonstrates. I am using iteration with no timeouts for prod job, using timeouts only in unit testing.Theory was in prod input stream will be indefinite and sometime long lull of no event might happen during maintenance, backlog etc. I really would like to avoid a bloat in the DAG by repeating same functions with filters and side outputs. Other than obvious repetition, it will increase the site of states by a factor. Even those slowly moving dimensions are not light (around half billion every day) :) 

> On May 4, 2020, at 10:13 PM, Ken Krugler <kk...@transpac.com> wrote:
> 
> Hi Ashish,
> 
> Wondering if you’re running into the gridlock problem I mention on slide #25 here: https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink <https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink>
> 
> If the iteration path has too much data in it, then the network buffer at the head of the iteration can fill up, and it never clears out because the operator consuming those buffers is blocked writing to the next operator in the iteration, and so on back to the head.
> 
> We ran into this when outlinks from web pages caused fan-out/amplification of the data being iterated, but maybe you hit it with restoring from state.
> 
> — Ken
> 
> 
>> On May 4, 2020, at 6:41 PM, Ashish Pokharel <ashishpok@yahoo.com <ma...@yahoo.com>> wrote:
>> 
>> Hi all,
>> 
>> Hope everyone is doing well!
>> 
>> I am running into what seems like a deadlock (application stalled) situation with a Flink streaming job upon restore from savepoint. Job has a slowly moving stream (S1) that needs to be “stateful” and a continuous stream (S2) which is “joined” with slow moving stream (S1). Some level of loss/repetition is acceptable in continuous stream (S2) and hence can rely on something like Kafka consumer states upon restarts etc. Continuous stream (S2) however needs to be iterated through states from slowly moving streams (S1) a few times (mostly 2). States are fair sized (ends up being 15GB on HDFS). When job is restarted with no continuous data (S2) on topic job starts up, restores states and does it’s initial checkpoint within 3 minutes. However, when app is started from savepoint and continuous stream (S2) is actually present in Kafka it seems like application comes to a halt. Looking at progress of checkpoints, it seems like every attempt is stuck after until some timeouts happen at around 10 mins. If iteration on stream is removed app can successfully start and checkpoint even when continuous stream (S2) is flowing in as well. Unfortunately we are working on a hosted environment for both data and platform, hence debugging with thread dumps etc will be challenging. 
>> 
>> I couldn’t find a known issue on this but was wondering if anyone has seen such behavior or know of any issues in past. It does look like checkpointing has to be set to forced to get an iterative job to checkpoint in the first place (an option that is marked deprecated already - working on 1.8.2 version as of now). I do understand challenges around consistent checkpointing of iterative stream. As I mentioned earlier, what I really want to maintain for the most part are states of slowly moving dimensions. Iterations does solve the problem at hand (multiple loops of logic) pretty gracefully but not being able to restore from savepoint will be a show stopper. 
>> 
>> Will appreciate any pointer / suggestions.
>> 
>> Thanks in advance, 
>> 
>> Ashish
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 


Re: Restore from savepoint with Iterations

Posted by Ken Krugler <kk...@transpac.com>.
Hi Ashish,

Wondering if you’re running into the gridlock problem I mention on slide #25 here: https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink <https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-ken-krugler-building-a-scalable-focused-web-crawler-with-flink>

If the iteration path has too much data in it, then the network buffer at the head of the iteration can fill up, and it never clears out because the operator consuming those buffers is blocked writing to the next operator in the iteration, and so on back to the head.

We ran into this when outlinks from web pages caused fan-out/amplification of the data being iterated, but maybe you hit it with restoring from state.

— Ken


> On May 4, 2020, at 6:41 PM, Ashish Pokharel <as...@yahoo.com> wrote:
> 
> Hi all,
> 
> Hope everyone is doing well!
> 
> I am running into what seems like a deadlock (application stalled) situation with a Flink streaming job upon restore from savepoint. Job has a slowly moving stream (S1) that needs to be “stateful” and a continuous stream (S2) which is “joined” with slow moving stream (S1). Some level of loss/repetition is acceptable in continuous stream (S2) and hence can rely on something like Kafka consumer states upon restarts etc. Continuous stream (S2) however needs to be iterated through states from slowly moving streams (S1) a few times (mostly 2). States are fair sized (ends up being 15GB on HDFS). When job is restarted with no continuous data (S2) on topic job starts up, restores states and does it’s initial checkpoint within 3 minutes. However, when app is started from savepoint and continuous stream (S2) is actually present in Kafka it seems like application comes to a halt. Looking at progress of checkpoints, it seems like every attempt is stuck after until some timeouts happen at around 10 mins. If iteration on stream is removed app can successfully start and checkpoint even when continuous stream (S2) is flowing in as well. Unfortunately we are working on a hosted environment for both data and platform, hence debugging with thread dumps etc will be challenging. 
> 
> I couldn’t find a known issue on this but was wondering if anyone has seen such behavior or know of any issues in past. It does look like checkpointing has to be set to forced to get an iterative job to checkpoint in the first place (an option that is marked deprecated already - working on 1.8.2 version as of now). I do understand challenges around consistent checkpointing of iterative stream. As I mentioned earlier, what I really want to maintain for the most part are states of slowly moving dimensions. Iterations does solve the problem at hand (multiple loops of logic) pretty gracefully but not being able to restore from savepoint will be a show stopper. 
> 
> Will appreciate any pointer / suggestions.
> 
> Thanks in advance, 
> 
> Ashish

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: Restore from savepoint with Iterations

Posted by Ashish Pokharel <as...@yahoo.com>.
Could this be FLIP-15 related as well then?

> On May 4, 2020, at 9:41 PM, Ashish Pokharel <as...@yahoo.com> wrote:
> 
> Hi all,
> 
> Hope everyone is doing well!
> 
> I am running into what seems like a deadlock (application stalled) situation with a Flink streaming job upon restore from savepoint. Job has a slowly moving stream (S1) that needs to be “stateful” and a continuous stream (S2) which is “joined” with slow moving stream (S1). Some level of loss/repetition is acceptable in continuous stream (S2) and hence can rely on something like Kafka consumer states upon restarts etc. Continuous stream (S2) however needs to be iterated through states from slowly moving streams (S1) a few times (mostly 2). States are fair sized (ends up being 15GB on HDFS). When job is restarted with no continuous data (S2) on topic job starts up, restores states and does it’s initial checkpoint within 3 minutes. However, when app is started from savepoint and continuous stream (S2) is actually present in Kafka it seems like application comes to a halt. Looking at progress of checkpoints, it seems like every attempt is stuck after until some timeouts happen at around 10 mins. If iteration on stream is removed app can successfully start and checkpoint even when continuous stream (S2) is flowing in as well. Unfortunately we are working on a hosted environment for both data and platform, hence debugging with thread dumps etc will be challenging. 
> 
> I couldn’t find a known issue on this but was wondering if anyone has seen such behavior or know of any issues in past. It does look like checkpointing has to be set to forced to get an iterative job to checkpoint in the first place (an option that is marked deprecated already - working on 1.8.2 version as of now). I do understand challenges around consistent checkpointing of iterative stream. As I mentioned earlier, what I really want to maintain for the most part are states of slowly moving dimensions. Iterations does solve the problem at hand (multiple loops of logic) pretty gracefully but not being able to restore from savepoint will be a show stopper. 
> 
> Will appreciate any pointer / suggestions.
> 
> Thanks in advance, 
> 
> Ashish