You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Miguel E. Coimbra (Jira)" <ji...@apache.org> on 2019/09/25 17:51:00 UTC

[jira] [Updated] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs

     [ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Miguel E. Coimbra updated FLINK-10867:
--------------------------------------
    Description: 
*Motivation.* 

There are job scenarios where Flink batch processing users may be interested in processing a large amount of data, outputting results to disk and then reusing the results for another type of computation in Flink again.

This feature suggestion emerged from my work as a PhD researcher working on graph stream processing.

[https://arxiv.org/abs/1810.02781]

More specifically, in our use case this would be very useful to maintain an evolving graph while allowing for specific logic on challenges such as _when_ and _how_ to integrate updates in the graph and also how to represent it.

Furthermore, it would also be an enabler for rich use-cases that have synergy with this existing Jira issue pertaining graph partitioning:

FLINK-1536 - Graph partitioning operators for Gelly

*Problem.*

While it would be negligible to write the results to disk and then read them back in a new job to be sent to the JobManager if they are small, this becomes prohibitive if there are several gigabytes of data to write/read and using a distributed storage (e.g. HDFS) is not an option.

Even if there is a distributed storage available, as the number of sequential jobs increases, even the benefits of the secondary storage being distributed will diminish.

*Existing alternatives.*

I also considered, as a possibility, to compose the sequence of jobs in a single big job to submit to the JobManager, thus allowing reuse of results due to the natural forwarding of results to subsequent operators in dataflow programing.

However, this becomes difficult due to two reasons:
 * The logic to connect the sequence of jobs may depend on factors external to Flink and not known at the start of the job composition.
 This also excludes limited iterative behavior like what is provided in {{BulkIteration/DeltaIteration;}}
 ** Composing a job with "too many" operators and inter-dependencies may lead to the Optimizer engaging an exponential optimization search space.
 This is particularly true for operators with multiple valid execution strategies, leading to a combinatorics problem.
 This leads to the Flink compiler _taking forever_ to even create a plan.
 I believe this is the current situation based on a reply I received from [~fhueske] last year.
 His reply was on the 7th of December 2017:
 Link: [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
 Mailing list thread title: "Re: How to perform efficient DataSet reuse between iterations"

 

*Idea.*

Perhaps the better way to describe this *CacheOperator* feature is the concept of "_job chaining_", where a new type of DataSink would receive data that will:
 - Be available to a subsequent job which somehow makes a reference to the DataSink of the previous job;
 - Have remained available (from the previous job execution) in the exact same TaskManagers in the cluster.

Likely, the optimal memory distribution will be pretty similar between chained jobs - if the data was read from disk again between jobs, it would likely be distributed with the same (automatic or not) strategies, hence the same distribution would likely be of use to sequential jobs.

*Design.*

Potential conflicts with the current Flink cluster execution model:
 - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job finishes in local mode, so it would be necessary to change local mode to keep a FlinkMiniCluster alive - what was the reasoning behind destroying it?
 Simplifying the implementation?

 - How would this look like in the API?
 I envisioned an example like this:

{{DataSet<Tuple2<Long, Long>> previousResult = callSomeFlinkDataflowOperator(); // The result of some previous computation.}}
 {{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}}
 {{... // Other operations...}}
 {{environment.execute();}}
 {{... // The previous job has finished.}}
 {{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet, which resulted from callSomeFlinkDataflowOperator() int the previous Flink job, remained in memory.}}
 {{environment.execute(); // Trigger a different job whose data depends on the previous one.}}

Besides adding appropriate classes to the Flink Java API, implementing this feature would require changing things so that:
 * JobManagers are aware that a completed job had cached operators - likely a new COMPLETED_AND_REUSABLE job state?
 * TaskManagers must keep references to the Flink memory management segments associated to the CacheOperator data;
 * CacheOperator must have a default number of usages and/or amount of time to be kept alive (I think both options should exist but the user may choose whether to use one or both);
 * Cluster coordination: should the JobManager be the entity that ultimately triggers the memory eviction order on the TaskManagers associated to a job with COMPLETED_AND_REUSABLE status?

 

*Related work.*

In Spark I believe the existing cache() operator does something similar to what I propose:

[https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching]

[https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]]

 

  was:
*Motivation.*

There are job scenarios where Flink batch processing users may be interested in processing a large amount of data, outputting results to disk and then reusing the results for another type of computation in Flink again.

This feature suggestion emerged from my work as a PhD researcher working on graph stream processing.

[https://arxiv.org/abs/1810.02781]

More specifically, in our use case this would be very useful to maintain an evolving graph while allowing for specific logic on challenges such as _when_ and _how_ to integrate updates in the graph and also how to represent it.

Furthermore, it would also be an enabler for rich use-cases that have synergy with this existing Jira issue pertaining graph partitioning:

FLINK-1536 - Graph partitioning operators for Gelly

*Problem.*

While it would be negligible to write the results to disk and then read them back in a new job to be sent to the JobManager if they are small, this becomes prohibitive if there are several gigabytes of data to write/read and using a distributed storage (e.g. HDFS) is not an option.

Even if there is a distributed storage available, as the number of sequential jobs increases, even the benefits of the secondary storage being distributed will diminish.

*Existing alternatives.*

I also considered, as a possibility, to compose the sequence of jobs in a single big job to submit to the JobManager, thus allowing reuse of results due to the natural forwarding of results to subsequent operators in dataflow programing.

However, this becomes difficult due to two reasons:
 * The logic to connect the sequence of jobs may depend on factors external to Flink and not known at the start of the job composition.
 This also excludes limited iterative behavior like what is provided in {{BulkIteration/DeltaIteration;}}
 ** Composing a job with "too many" operators and inter-dependencies may lead to the Optimizer engaging an exponential optimization search space.
 This is particularly true for operators with multiple valid execution strategies, leading to a combinatorics problem.
 This leads to the Flink compiler _taking forever_ to even create a plan.
 I believe this is the current situation based on a reply I received from [~fhueske] last year.
 His reply was on the 7th of December 2017:
 Link: [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
 Mailing list thread title: "Re: How to perform efficient DataSet reuse between iterations"

 

*Idea.*

Perhaps the better way to describe this *CacheOperator* feature is the concept of "_job chaining_", where a new type of DataSink would receive data that will:
 - Be available to a subsequent job which somehow makes a reference to the DataSink of the previous job;
 - Have remained available (from the previous job execution) in the exact same TaskManagers in the cluster.

Likely, the optimal memory distribution will be pretty similar between chained jobs - if the data was read from disk again between jobs, it would likely be distributed with the same (automatic or not) strategies, hence the same distribution would likely be of use to sequential jobs.

*Design.*

Potential conflicts with the current Flink cluster execution model:
 - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job finishes in local mode, so it would be necessary to change local mode to keep a FlinkMiniCluster alive - what was the reasoning behind destroying it?
 Simplifying the implementation?

 - How would this look like in the API?
 I envisioned an example like this:

{{DataSet<Tuple2<Long, Long>> previousResult = callSomeFlinkDataflowOperator(); // The result of some previous computation.}}
 {{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}}
 {{... // Other operations...}}
 {{environment.execute();}}
 {{... // The previous job has finished.}}
 {{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet, which resulted from callSomeFlinkDataflowOperator() int the previous Flink job, remained in memory.}}
 {{environment.execute(); // Trigger a different job whose data depends on the previous one.}}

Besides adding appropriate classes to the Flink Java API, implementing this feature would require changing things so that:
 * JobManagers are aware that a completed job had cached operators - likely a new COMPLETED_AND_REUSABLE job state?
 * TaskManagers must keep references to the Flink memory management segments associated to the CacheOperator data;
 * CacheOperator must have a default number of usages and/or amount of time to be kept alive (I think both options should exist but the user may choose whether to use one or both);
 * Cluster coordination: should the JobManager be the entity that ultimately triggers the memory eviction order on the TaskManagers associated to a job with COMPLETED_AND_REUSABLE status?

 

*Related work.*

In Spark I believe the existing cache() operator does something similar to what I propose:

[https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching]

[https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]]

 


> Add a DataSet-based CacheOperator to reuse results between jobs
> ---------------------------------------------------------------
>
>                 Key: FLINK-10867
>                 URL: https://issues.apache.org/jira/browse/FLINK-10867
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataSet, Runtime / Coordination, Runtime / Task
>    Affects Versions: 1.8.0
>            Reporter: Miguel E. Coimbra
>            Priority: Major
>
> *Motivation.* 
> There are job scenarios where Flink batch processing users may be interested in processing a large amount of data, outputting results to disk and then reusing the results for another type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on graph stream processing.
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an evolving graph while allowing for specific logic on challenges such as _when_ and _how_ to integrate updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy with this existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them back in a new job to be sent to the JobManager if they are small, this becomes prohibitive if there are several gigabytes of data to write/read and using a distributed storage (e.g. HDFS) is not an option.
> Even if there is a distributed storage available, as the number of sequential jobs increases, even the benefits of the secondary storage being distributed will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a single big job to submit to the JobManager, thus allowing reuse of results due to the natural forwarding of results to subsequent operators in dataflow programing.
> However, this becomes difficult due to two reasons:
>  * The logic to connect the sequence of jobs may depend on factors external to Flink and not known at the start of the job composition.
>  This also excludes limited iterative behavior like what is provided in {{BulkIteration/DeltaIteration;}}
>  ** Composing a job with "too many" operators and inter-dependencies may lead to the Optimizer engaging an exponential optimization search space.
>  This is particularly true for operators with multiple valid execution strategies, leading to a combinatorics problem.
>  This leads to the Flink compiler _taking forever_ to even create a plan.
>  I believe this is the current situation based on a reply I received from [~fhueske] last year.
>  His reply was on the 7th of December 2017:
>  Link: [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
>  Mailing list thread title: "Re: How to perform efficient DataSet reuse between iterations"
>  
> *Idea.*
> Perhaps the better way to describe this *CacheOperator* feature is the concept of "_job chaining_", where a new type of DataSink would receive data that will:
>  - Be available to a subsequent job which somehow makes a reference to the DataSink of the previous job;
>  - Have remained available (from the previous job execution) in the exact same TaskManagers in the cluster.
> Likely, the optimal memory distribution will be pretty similar between chained jobs - if the data was read from disk again between jobs, it would likely be distributed with the same (automatic or not) strategies, hence the same distribution would likely be of use to sequential jobs.
> *Design.*
> Potential conflicts with the current Flink cluster execution model:
>  - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job finishes in local mode, so it would be necessary to change local mode to keep a FlinkMiniCluster alive - what was the reasoning behind destroying it?
>  Simplifying the implementation?
>  - How would this look like in the API?
>  I envisioned an example like this:
> {{DataSet<Tuple2<Long, Long>> previousResult = callSomeFlinkDataflowOperator(); // The result of some previous computation.}}
>  {{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}}
>  {{... // Other operations...}}
>  {{environment.execute();}}
>  {{... // The previous job has finished.}}
>  {{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet, which resulted from callSomeFlinkDataflowOperator() int the previous Flink job, remained in memory.}}
>  {{environment.execute(); // Trigger a different job whose data depends on the previous one.}}
> Besides adding appropriate classes to the Flink Java API, implementing this feature would require changing things so that:
>  * JobManagers are aware that a completed job had cached operators - likely a new COMPLETED_AND_REUSABLE job state?
>  * TaskManagers must keep references to the Flink memory management segments associated to the CacheOperator data;
>  * CacheOperator must have a default number of usages and/or amount of time to be kept alive (I think both options should exist but the user may choose whether to use one or both);
>  * Cluster coordination: should the JobManager be the entity that ultimately triggers the memory eviction order on the TaskManagers associated to a job with COMPLETED_AND_REUSABLE status?
>  
> *Related work.*
> In Spark I believe the existing cache() operator does something similar to what I propose:
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching]
> [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)