You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Satyam Shekhar <sa...@gmail.com> on 2020/07/29 11:53:24 UTC

Colocating Compute

Hello,

I am using Flink v1.10 in a distributed environment to run SQL queries on
batch and streaming data.

In my setup, data is sharded and distributed across the cluster. Each shard
receives streaming updates from some external source. I wish to minimize
data movement during query evaluation for performance reasons. For that, I
need some construct to advise Flink planner to bind splits (shard) to the
host where it is located.

I have come across InputSplitAssigner which gives me levers to influence
compute colocation for batch queries. Is there a way to do the same for
streaming queries as well?

Regards,
Satyam

Re: Colocating Compute

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Satyam,

It should be fine to have unbounded InputFormat. The important thing is
not to produce more splits than there are parallel instances of your
source. In createInputSplits(int minNumSplits) generate only
minNumSplits. It is so that all splits can be assigned immediately.
Unfortunately you won't have access to the state in InputFormat. Now
that I am thinking this will be problematic with checkpoints as you
cannot store the offset, up to when you've read the split.

In the SourceFunction stack as far as I know there is no built-in
support for that. As an alternative you could maybe built-in the split
assignment into the SourceFunction. Unfortunately as it would not happen
in a single location you would have to ensure that the logic can assign
all the splits independently in each of the parallel instances of the
source.

The Split, SourceReader, and SplitEnumerator are new components
introduced in FLIP-27[1]. I am not very familiar with those yet.
Unfortunately those are not yet supported in the Table ecosystem. I also
don't know if it is possible to assign the splits based on the host
machine with them. I am cc'ing Stephan and Becket who worked on those to
check if it is already possible with the interfaces.

Best,

Dawid


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

On 31/07/2020 02:58, Satyam Shekhar wrote:
> Hi Dawid,
>
> I am currently on Flink v1.10. Do streaming pipelines support
> unbounded InputFormat in v1.10? My current setup uses SourceFunction
> for streaming pipeline and InputFormat for batch queries.
>
> I see the documentation for Flink v1.11 describe concepts for Split,
> SourceReader, and SplitEnumerator to enable streaming queries on
> unbounded splits. Is that the direction you were pointing to?
>
> Regards,
> Satyam
>
> On Thu, Jul 30, 2020 at 6:03 AM Dawid Wysakowicz
> <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>
>     Hi Satyam,
>
>     I think you can use the InputSplitAssigner also for streaming
>     pipelines
>     through an InputFormat. You can use
>     StreamExecutionEnvironment#createInput or for SQL you can write your
>     source according to the documentation here:
>     https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#dynamic-table-source
>
>     If you do not want to use an InputFormat I think there is no easy
>     way to
>     do it now.
>
>     Best,
>
>     Dawid
>
>     On 29/07/2020 13:53, Satyam Shekhar wrote:
>     > Hello,
>     >
>     > I am using Flink v1.10 in a distributed environment to run SQL
>     queries
>     > on batch and streaming data.
>     >
>     > In my setup, data is sharded and distributed across the cluster.
>     Each
>     > shard receives streaming updates from some external source. I
>     wish to
>     > minimize data movement during query evaluation for performance
>     > reasons. For that, I need some construct to advise Flink planner to
>     > bind splits (shard) to the host where it is located. 
>     >
>     > I have come across InputSplitAssigner which gives me levers to
>     > influence compute colocation for batch queries. Is there a way to do
>     > the same for streaming queries as well? 
>     >
>     > Regards,
>     > Satyam
>

Re: Colocating Compute

Posted by Satyam Shekhar <sa...@gmail.com>.
Hi Dawid,

I am currently on Flink v1.10. Do streaming pipelines support unbounded
InputFormat in v1.10? My current setup uses SourceFunction for streaming
pipeline and InputFormat for batch queries.

I see the documentation for Flink v1.11 describe concepts for Split,
SourceReader, and SplitEnumerator to enable streaming queries on unbounded
splits. Is that the direction you were pointing to?

Regards,
Satyam

On Thu, Jul 30, 2020 at 6:03 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Satyam,
>
> I think you can use the InputSplitAssigner also for streaming pipelines
> through an InputFormat. You can use
> StreamExecutionEnvironment#createInput or for SQL you can write your
> source according to the documentation here:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#dynamic-table-source
>
> If you do not want to use an InputFormat I think there is no easy way to
> do it now.
>
> Best,
>
> Dawid
>
> On 29/07/2020 13:53, Satyam Shekhar wrote:
> > Hello,
> >
> > I am using Flink v1.10 in a distributed environment to run SQL queries
> > on batch and streaming data.
> >
> > In my setup, data is sharded and distributed across the cluster. Each
> > shard receives streaming updates from some external source. I wish to
> > minimize data movement during query evaluation for performance
> > reasons. For that, I need some construct to advise Flink planner to
> > bind splits (shard) to the host where it is located.
> >
> > I have come across InputSplitAssigner which gives me levers to
> > influence compute colocation for batch queries. Is there a way to do
> > the same for streaming queries as well?
> >
> > Regards,
> > Satyam
>
>

Re: Colocating Compute

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Satyam,

I think you can use the InputSplitAssigner also for streaming pipelines
through an InputFormat. You can use
StreamExecutionEnvironment#createInput or for SQL you can write your
source according to the documentation here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#dynamic-table-source

If you do not want to use an InputFormat I think there is no easy way to
do it now.

Best,

Dawid

On 29/07/2020 13:53, Satyam Shekhar wrote:
> Hello,
>
> I am using Flink v1.10 in a distributed environment to run SQL queries
> on batch and streaming data.
>
> In my setup, data is sharded and distributed across the cluster. Each
> shard receives streaming updates from some external source. I wish to
> minimize data movement during query evaluation for performance
> reasons. For that, I need some construct to advise Flink planner to
> bind splits (shard) to the host where it is located. 
>
> I have come across InputSplitAssigner which gives me levers to
> influence compute colocation for batch queries. Is there a way to do
> the same for streaming queries as well? 
>
> Regards,
> Satyam