You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by lalala <La...@activist.com> on 2020/11/15 09:11:44 UTC

Dynamic ad hoc query deployment strategy

Hi all,

I would like to consult with you regarding deployment strategies.

We have +250 Kafka topics that we want users of the platform to submit SQL
queries that will run indefinitely. We have a query parsers to extract topic
names from user queries, and the application locally creates Kafka tables
and execute the query. The result can be collected to multiple sinks such as
databases, files, cloud services.

We want to have the best isolation between queries, so in case of failures,
the other jobs will not get affected. We have a huge YARN cluster to handle
1PB a day scale from Kafka. I believe cluster per job type deployment makes
sense for the sake of isolation. However, that creates some scalability
problems. There might be SQL queries running on the same Kafka topic that we
do not want to read them again for each query in different sessions. The
ideal case is that we read the topic once and executes multiple queries on
this data to avoid rereading the same topic. That breaks the desire of a
fully isolated system, but it improves network and Kafka performance and
still provides isolation on the topic level as we just read the topic once
and execute multiple SQL queries on it.

We are quite new to Flink, but we have experience with Spark. In Spark, we
can submit an application, and in master, that can listen a query queue and
submit jobs to the cluster dynamically from different threads. However, In
Flink, it looks like the main() has to produce the job the graph in advance.

We do use an EMR cluster; what would you recommend for my use case?

Thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic ad hoc query deployment strategy

Posted by Kostas Kloudas <kk...@gmail.com>.
I am also cc'ing Timo to see if he has anything more to add on this.

Cheers,
Kostas

On Thu, Nov 19, 2020 at 9:41 PM Kostas Kloudas <kk...@gmail.com> wrote:
>
> Hi,
>
> Thanks for reaching out!
>
> First of all, I would like to point out that an interesting
> alternative to the per-job cluster could be running your jobs in
> application mode [1].
>
> Given that you want to run arbitrary SQL queries, I do not think you
> can "share" across queries the part of the job graph that reads a
> topic. In general, Flink (not only in SQL) creates the graph of a job
> before the job is executed. And especially in SQL you do not even have
> control over the graph, as the translation logic from query to
> physical operators is opaque and not exposed to the user.
>
> That said, you may want to have a look at [2]. It is pretty old but it
> describes a potentially similar usecase. Unfortunately, it does not
> support SQL.
>
> Cheers,
> Kostas
>
> [1] https://flink.apache.org/news/2020/07/14/application-mode.html
> [2] https://www.ververica.com/blog/rbea-scalable-real-time-analytics-at-king
>
> On Sun, Nov 15, 2020 at 10:11 AM lalala <La...@activist.com> wrote:
> >
> > Hi all,
> >
> > I would like to consult with you regarding deployment strategies.
> >
> > We have +250 Kafka topics that we want users of the platform to submit SQL
> > queries that will run indefinitely. We have a query parsers to extract topic
> > names from user queries, and the application locally creates Kafka tables
> > and execute the query. The result can be collected to multiple sinks such as
> > databases, files, cloud services.
> >
> > We want to have the best isolation between queries, so in case of failures,
> > the other jobs will not get affected. We have a huge YARN cluster to handle
> > 1PB a day scale from Kafka. I believe cluster per job type deployment makes
> > sense for the sake of isolation. However, that creates some scalability
> > problems. There might be SQL queries running on the same Kafka topic that we
> > do not want to read them again for each query in different sessions. The
> > ideal case is that we read the topic once and executes multiple queries on
> > this data to avoid rereading the same topic. That breaks the desire of a
> > fully isolated system, but it improves network and Kafka performance and
> > still provides isolation on the topic level as we just read the topic once
> > and execute multiple SQL queries on it.
> >
> > We are quite new to Flink, but we have experience with Spark. In Spark, we
> > can submit an application, and in master, that can listen a query queue and
> > submit jobs to the cluster dynamically from different threads. However, In
> > Flink, it looks like the main() has to produce the job the graph in advance.
> >
> > We do use an EMR cluster; what would you recommend for my use case?
> >
> > Thank you.
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic ad hoc query deployment strategy

Posted by lalala <La...@activist.com>.
Hi Timo and Dawid,

Thank you for a detailed answer; it looks like we need to reconsider all job
submission flow.

What is the best way to compare the new job graph? Can we use Flink
visualizer to ensure that the new job graph shares the table as you mention
It is not guaranteed?

Best regards,



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic ad hoc query deployment strategy

Posted by Timo Walther <tw...@apache.org>.
I agree with Dawid.

Maybe one thing to add is that reusing parts of the pipeline is possible 
via StatementSets in TableEnvironment. They allow you to add multiple 
queries that consume from a common part of the pipeline (for example a 
common source). But all of that is compiled into one big job and static 
during runtime, not isolated.

One option is to introduce an additional Flink job that multiplexes the 
source Kafka topic into more Kafka topics such that isolated jobs can 
access this intermediate storage.

I hope this helps.

Regards,
Timo

On 24.11.20 16:54, Dawid Wysakowicz wrote:
> Hi,
> 
> Really sorry for a late reply.
> 
> To the best of my knowledge there is no such possibility to "attach" to 
> a source/reader of a different job. Every job would read the source 
> separately.
> 
> `The GenericInMemoryCatalog is an in-memory
> implementation of a catalog. All objects will be available only for the
> lifetime of the session.`. I presume, in session mode, we can share Kafka
> source for multiple SQL jobs?
> 
> Unfortunately this is wrong assumption. Catalogs store "metadata of 
> Tables, such as connetion parameters, schema etc. Not the data itself, 
> or parts of the graph. The information from a catalog can be used to 
> create an execution graph that can be submitted to a cluster. It has 
> nothing to do with a session cluster. The session here means a job/the 
> lifetime of the GenericInMemoryCatalog.
> 
> Both queries will share the same reader as they are part of a single job
> graph. Can we somehow take a snapshot of this and submit another query with
> them again under the same job graph?
> 
> Again unfortunately there is no guarantees this will work. As of now it 
> is a limitation of SQL that it does not support stateful upgrades of a 
> Graph or Flink version. As Till said, if the plan will contain the same 
> sub plans they should be able to match. However with such an extensive 
> changes to the graph I would not count that it happens. It can work for 
> rather simpler changes such as e.g. changing a predicate (but still it 
> can greatly affect the plan if the predicate could've been optimized). 
> There were and there are some discussions going on to improve the 
> situation here.
> 
> A proper solution for the problem for a STREAMING job would be rather 
> hard in my opinion. As we would have to somehow keep the state of the 
> shared source between multiple different jobs. We would need to know 
> e.g. the offsets that a certain job consumed up to a certain checkpoint. 
> What to do if e.g. a particular query requests to start reading from 
> offsets in the past etc.
> 
> There is some ongoing effort to support caching a queries that could be 
> reused between jobs in the same cluster as a better support for 
> Interactive programming[1], but I don't think it will support a 
> STREAMING mode.
> 
> Just as a side. I am not a Spark expert and I might be completely wrong, 
> but as far as I am familiar with Spark, it also does not support 
> dynamically reusing streaming sources. It does have the caching of 
> intermediate shuffles implemented, something that the FLIP-36 resembles.
> 
> Best regards,
> 
> Dawid
> 
> [1] https://cwiki.apache.org/confluence/x/8hclBg
> 
> On 23/11/2020 21:09, lalala wrote:
>> Hi Till,
>>
>> Thank you for your comment. I am looking forward to hearing from Timo and
>> Dawid as well.
>>
>> Best regards,
>>
>>
>>
>> --
>> Sent from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dynamic ad hoc query deployment strategy

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

Really sorry for a late reply.

To the best of my knowledge there is no such possibility to "attach" to
a source/reader of a different job. Every job would read the source
separately.

`The GenericInMemoryCatalog is an in-memory
implementation of a catalog. All objects will be available only for the
lifetime of the session.`. I presume, in session mode, we can share Kafka
source for multiple SQL jobs?

Unfortunately this is wrong assumption. Catalogs store "metadata of
Tables, such as connetion parameters, schema etc. Not the data itself,
or parts of the graph. The information from a catalog can be used to
create an execution graph that can be submitted to a cluster. It has
nothing to do with a session cluster. The session here means a job/the
lifetime of the GenericInMemoryCatalog.

Both queries will share the same reader as they are part of a single job
graph. Can we somehow take a snapshot of this and submit another query with
them again under the same job graph?

Again unfortunately there is no guarantees this will work. As of now it
is a limitation of SQL that it does not support stateful upgrades of a
Graph or Flink version. As Till said, if the plan will contain the same
sub plans they should be able to match. However with such an extensive
changes to the graph I would not count that it happens. It can work for
rather simpler changes such as e.g. changing a predicate (but still it
can greatly affect the plan if the predicate could've been optimized).
There were and there are some discussions going on to improve the
situation here.

A proper solution for the problem for a STREAMING job would be rather
hard in my opinion. As we would have to somehow keep the state of the
shared source between multiple different jobs. We would need to know
e.g. the offsets that a certain job consumed up to a certain checkpoint.
What to do if e.g. a particular query requests to start reading from
offsets in the past etc.

There is some ongoing effort to support caching a queries that could be
reused between jobs in the same cluster as a better support for
Interactive programming[1], but I don't think it will support a
STREAMING mode.

Just as a side. I am not a Spark expert and I might be completely wrong,
but as far as I am familiar with Spark, it also does not support
dynamically reusing streaming sources. It does have the caching of
intermediate shuffles implemented, something that the FLIP-36 resembles.

Best regards,

Dawid

[1] https://cwiki.apache.org/confluence/x/8hclBg

On 23/11/2020 21:09, lalala wrote:
> Hi Till,
>
> Thank you for your comment. I am looking forward to hearing from Timo and
> Dawid as well.
>
> Best regards,
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic ad hoc query deployment strategy

Posted by lalala <La...@activist.com>.
Hi Till,

Thank you for your comment. I am looking forward to hearing from Timo and
Dawid as well.

Best regards,



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic ad hoc query deployment strategy

Posted by Till Rohrmann <tr...@apache.org>.
Hi Lalala,

I think this approach can work as long as the generated query plan contains
the same sub plan for the previous queries as before. Otherwise Flink won't
be able to match the state to the operators of the plan. I think Timo and
Dawid should know definitely whether this is possible or not.

Cheers,
Till

On Mon, Nov 23, 2020 at 10:33 AM lalala <La...@activist.com> wrote:

> Hi Kostas,
>
> Yes, that would satisfy my use case as the platform is always
> future-oriented. Any arbitrary query is executed on the latest data.
>
> From your comment, I understand that even the session mode does not
> optimize
> our readers. I wish Flink could support arbitrary job submission and graph
> generation in runtime, so we could submit jobs dynamically from main() as
> we
> do in Spark.
>
> If we want to group similar jobs, what would you recommend us for arbitrary
> long-running jobs? Can we somehow take a snapshot of the queries running
> under a job graph then resubmit them with the new query?
>
> I assume if we do the following under a single job(main method);
>
> ’’’
> Source: create table A...
> Query1: select * from A
> Query 2: select * from A
> ’’’
>
> Both queries will share the same reader as they are part of a single job
> graph. Can we somehow take a snapshot of this and submit another query with
> them again under the same job graph?
>
> I really appreciate your time for answering my questions,
>
> Best.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Dynamic ad hoc query deployment strategy

Posted by lalala <La...@activist.com>.
Hi Kostas,

Yes, that would satisfy my use case as the platform is always
future-oriented. Any arbitrary query is executed on the latest data.

From your comment, I understand that even the session mode does not optimize
our readers. I wish Flink could support arbitrary job submission and graph
generation in runtime, so we could submit jobs dynamically from main() as we
do in Spark.

If we want to group similar jobs, what would you recommend us for arbitrary
long-running jobs? Can we somehow take a snapshot of the queries running
under a job graph then resubmit them with the new query?

I assume if we do the following under a single job(main method);

’’’
Source: create table A...
Query1: select * from A
Query 2: select * from A
’’’

Both queries will share the same reader as they are part of a single job
graph. Can we somehow take a snapshot of this and submit another query with
them again under the same job graph?

I really appreciate your time for answering my questions,

Best.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic ad hoc query deployment strategy

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Lalala,

Even in session mode, the jobgraph is created before the job is
executed. So all the above hold.
Although I am not super familiar with the catalogs, what you want is
that two or more jobs share the same readers of a source. This is not
done automatically in DataStream or DataSet and I am pretty sure that
also Table and SQL do not perform any cross-query optimization.

In addition, even if they did, are you sure that this would be enough
for your queries? THe users will submit their queries at any point in
time and this would mean that each query would start processing from
where the reader is at that point in time, which is arbitrary. Is this
something that satisfies your requirements?

I will also include Dawid in the discussion to see if he has anything
to add about the Table API and SQL.

Cheers,
Kostas

On Fri, Nov 20, 2020 at 7:47 PM lalala <La...@activist.com> wrote:
>
> Hi Kostas,
>
> Thank you for your response.
>
> Is what you are saying valid for session mode? I can submit my jobs to the
> existing Flink session, will they be able to share the sources?
>
> We do register our Kafka tables to `GenericInMemoryCatalog`, and the
> documentation says `The GenericInMemoryCatalog is an in-memory
> implementation of a catalog. All objects will be available only for the
> lifetime of the session.`. I presume, in session mode, we can share Kafka
> source for multiple SQL jobs?
>
> That is not want we wanted for the best isolation, but if it is not possible
> with Flink, we are also good with session mode.
>
> Best regards,
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic ad hoc query deployment strategy

Posted by lalala <La...@activist.com>.
Hi Kostas,

Thank you for your response.

Is what you are saying valid for session mode? I can submit my jobs to the
existing Flink session, will they be able to share the sources?

We do register our Kafka tables to `GenericInMemoryCatalog`, and the
documentation says `The GenericInMemoryCatalog is an in-memory
implementation of a catalog. All objects will be available only for the
lifetime of the session.`. I presume, in session mode, we can share Kafka
source for multiple SQL jobs?

That is not want we wanted for the best isolation, but if it is not possible
with Flink, we are also good with session mode.

Best regards,





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic ad hoc query deployment strategy

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi,

Thanks for reaching out!

First of all, I would like to point out that an interesting
alternative to the per-job cluster could be running your jobs in
application mode [1].

Given that you want to run arbitrary SQL queries, I do not think you
can "share" across queries the part of the job graph that reads a
topic. In general, Flink (not only in SQL) creates the graph of a job
before the job is executed. And especially in SQL you do not even have
control over the graph, as the translation logic from query to
physical operators is opaque and not exposed to the user.

That said, you may want to have a look at [2]. It is pretty old but it
describes a potentially similar usecase. Unfortunately, it does not
support SQL.

Cheers,
Kostas

[1] https://flink.apache.org/news/2020/07/14/application-mode.html
[2] https://www.ververica.com/blog/rbea-scalable-real-time-analytics-at-king

On Sun, Nov 15, 2020 at 10:11 AM lalala <La...@activist.com> wrote:
>
> Hi all,
>
> I would like to consult with you regarding deployment strategies.
>
> We have +250 Kafka topics that we want users of the platform to submit SQL
> queries that will run indefinitely. We have a query parsers to extract topic
> names from user queries, and the application locally creates Kafka tables
> and execute the query. The result can be collected to multiple sinks such as
> databases, files, cloud services.
>
> We want to have the best isolation between queries, so in case of failures,
> the other jobs will not get affected. We have a huge YARN cluster to handle
> 1PB a day scale from Kafka. I believe cluster per job type deployment makes
> sense for the sake of isolation. However, that creates some scalability
> problems. There might be SQL queries running on the same Kafka topic that we
> do not want to read them again for each query in different sessions. The
> ideal case is that we read the topic once and executes multiple queries on
> this data to avoid rereading the same topic. That breaks the desire of a
> fully isolated system, but it improves network and Kafka performance and
> still provides isolation on the topic level as we just read the topic once
> and execute multiple SQL queries on it.
>
> We are quite new to Flink, but we have experience with Spark. In Spark, we
> can submit an application, and in master, that can listen a query queue and
> submit jobs to the cluster dynamically from different threads. However, In
> Flink, it looks like the main() has to produce the job the graph in advance.
>
> We do use an EMR cluster; what would you recommend for my use case?
>
> Thank you.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/