You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Pilgrim Beart <pi...@devicepilot.com> on 2021/02/08 18:22:46 UTC

Any plans to make Flink configurable with pure data?

To a naive Flink newcomer (me) it's a little surprising that there is no
pure "data" mechanism for specifying a Flink pipeline, only "code"
interfaces. With the DataStream interface I can use Java, Scala or Python
to set up a pipeline and then execute it - but that doesn't really
seem to *need
*a programming model, it seems like configuration, which could be done with
data? OK, one does need occasionally to specify some custom code, e.g. a
ProcessFunction, but for any given use-case, a relatively static library of
such functions would seem fine.

My use case is that I have lots of customers, and I'm doing a similar job
for each of them, so I'd prefer to have a library of common code (e.g.
ProcessFunctions), and then specify each customer's specific requirements
in a single config file.  To do that in Java, I'd have to do
metaprogramming (to build various pieces of Java out of that config file).

Flink SQL seems to be the closest solution, but doesn't appear to support
fundamental Flink concepts such as timers (?). Is there a plan to evolve
Flink SQL to support timers? Timeouts is my specific need.

Thanks,

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=527a1892-cb03-476e-bce3-95b7b9783178>

Re: Any plans to make Flink configurable with pure data?

Posted by Arvid Heise <ar...@apache.org>.
Hi Pilgrim,

Thank you for clarifying. I solved a similar challenge in Spark (for batch)
by doing the following (translated into Flink terms):
- I created most of the application with the Table API - the programmatic
interface to Flink SQL. Here it is quite easy to implement structural
variance of similar queries. The fields and types can be dynamically set
through configuration that is injected while composing the query.
- Configuration was specified in some CSV (the users were Excel-fanatics)
and specific to the domain. (I'd recommend HOCON else)
- For low level constructs (e.g. timers), you can convert a Table to a
DataStream and use ProcessFunctions and then convert back to Table. [1]

With that approach the users were able to aggregate >100 different event
types from ~10 types of sources (the source type needed to be added once to
the application by me - mostly because the schema needed to be
incorporated).

The big advantage (and what I assume works in Flink as well) was that the
optimizer smartly grouped the aggregations, such that the number of
aggregation operators solely depended on the number of unique sources.

I'm happy to provide more details

Arvid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset

On Thu, Feb 11, 2021 at 3:51 PM Pilgrim Beart <pi...@devicepilot.com>
wrote:

> Hi Arvid, thanks for the response. I don't think I was being very clear.
>
> This is a streaming application. Each customer will have a Flink job
> running in its own private cluster. The rough *shape *of the analysis
> that the Flink job is doing for each customer is always the same, because
> we do just one thing - (IoT service management). So our DAG is always the
> same shape.
>
> But the specific requirements of each customer are slightly different. The
> exact shape of their input data, the exact metrics they need to gather. So
> for example, if they care about timeouts, then customer 1 might want 10
> minute timeouts whenever message fields A or B change, but customer 2 might
> want 2 day timeouts whenever message fields C or D change.
>
> For maintainability it would make sense if we could encapsulate all the
> code which is common to all customers into one standard Flink library, and
> separate out the bits which are different somewhere else. Let's call that
> "somewhere else" a config file. The config file then becomes a complete
> specification of the transform. We can version it, and do CI/CD etc. easily
> - and any enhancement or bug-fixing on our core Flink library can be
> inherited by all customers.
>
> Part of the challenge is Java's strong typing - the messages flowing
> through Flink always have a similar shape, but their exact fields will be
> customer-specific. Java needs to know about this at compile time, which is
> fine. And simple numeric parameters, such as e.g. timeout intervals, can be
> pulled from a config file at runtime.
>
> But consider the above example: for one customer I want to set a timer
> when fields A & B in the message change. For another customer I want to set
> a timer when fields C & D change.  Because the only way I can set a timer
> is with code, this means I need to write some code to do this. Which means
> that my configuration has to be code. Code is infinitely powerful, but
> infinitely complicated and dangerous. It's not really ideal to have to use
> code in a configuration.
>
> Ideally, I'd like my config file to say something like this to define
> timers:
> T1=timeout(A)
> T2=timeout(B)
>
> I could solve this with meta-programming:
> a) Either I could hold my messages in an abstract type (e.g. my messages
> could just be Maps), though that's pretty inefficient and seems like I'm
> bypassing a lot of the benefits of Java's type safety and speed.
> b) Or I could take my specification and turn it into a Java class before I
> compile my Flink job. But that seems a bit ugly?
>
> Hope I've at least made clear the goal!
>
> Thanks for any further thoughts,
>
> -Pilgrim
> --
> Learn more at https://devicepilot.com @devicepilot
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=a7811a72-90dd-4b91-fc61-4d7ff231851a>
>  +44 7961 125282
> See our latest features
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=a7811a72-90dd-4b91-fc61-4d7ff231851a>
> and book me
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=a7811a72-90dd-4b91-fc61-4d7ff231851a> for
> a video call.
>
>
>
> On Thu, 11 Feb 2021 at 11:03, Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Pilgrim,
>>
>> it sounds to me as if you are planning to use Flink for batch processing
>> by having some centralized server to where you submit your queries.
>>
>> While you can use Flink SQL for that (and it's used in this way in larger
>> companies), the original idea of Flink was to be used in streaming
>> applications. For these applications, the query is not just configuration;
>> the query IS the application. For these applications, you'd usually spawn a
>> few K8s pods and let your application+Flink run. Hence, the many code
>> interfaces.
>>
>> Even if you use SQL on streaming, you'd usually start a new ad-hoc
>> cluster for your application to have better isolation. There are quite a
>> few deployments that use YARN or Mesos to provide a large number of nodes
>> which are then used by a large number of Flink batch and streaming jobs,
>> but I'd usually not recommend that for newer users. I'd even go as far as
>> to say that most of these organizations wouldn't use that stack if they'd
>> create a cluster now and instead would also go for a K8s solution.
>>
>> On Tue, Feb 9, 2021 at 1:05 PM Yun Gao <yu...@aliyun.com> wrote:
>>
>>> Hi Pilgrim,
>>>
>>> Currently table indeed could not using low level api like timer, would a
>>> mixture of sql & datastream
>>> could satisfy the requirements? A job might be created via multiple
>>> sqls, and connected via datastream
>>> operations.
>>>
>>> Best,
>>>  Yun
>>>
>>>
>>> ------------------------------------------------------------------
>>> Sender:Pilgrim Beart<pi...@devicepilot.com>
>>> Date:2021/02/09 02:22:46
>>> Recipient:<us...@flink.apache.org>
>>> Theme:Any plans to make Flink configurable with pure data?
>>>
>>> To a naive Flink newcomer (me) it's a little surprising that there is no
>>> pure "data" mechanism for specifying a Flink pipeline, only "code"
>>> interfaces. With the DataStream interface I can use Java, Scala or Python
>>> to set up a pipeline and then execute it - but that doesn't really seem to *need
>>> *a programming model, it seems like configuration, which could be done
>>> with data? OK, one does need occasionally to specify some custom code, e.g.
>>> a ProcessFunction, but for any given use-case, a relatively static library
>>> of such functions would seem fine.
>>>
>>> My use case is that I have lots of customers, and I'm doing a similar
>>> job for each of them, so I'd prefer to have a library of common code (e.g.
>>> ProcessFunctions), and then specify each customer's specific requirements
>>> in a single config file.  To do that in Java, I'd have to do
>>> metaprogramming (to build various pieces of Java out of that config file).
>>>
>>> Flink SQL seems to be the closest solution, but doesn't appear to
>>> support fundamental Flink concepts such as timers (?). Is there a plan to
>>> evolve Flink SQL to support timers? Timeouts is my specific need.
>>>
>>> Thanks,
>>>
>>> -Pilgrim
>>> --
>>> Learn more at https://devicepilot.com @devicepilot
>>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=527a1892-cb03-476e-bce3-95b7b9783178>
>>>
>>>
>>>
>>>

Re: Any plans to make Flink configurable with pure data?

Posted by Pilgrim Beart <pi...@devicepilot.com>.
Hi Arvid, thanks for the response. I don't think I was being very clear.

This is a streaming application. Each customer will have a Flink job
running in its own private cluster. The rough *shape *of the analysis that
the Flink job is doing for each customer is always the same, because we do
just one thing - (IoT service management). So our DAG is always the same
shape.

But the specific requirements of each customer are slightly different. The
exact shape of their input data, the exact metrics they need to gather. So
for example, if they care about timeouts, then customer 1 might want 10
minute timeouts whenever message fields A or B change, but customer 2 might
want 2 day timeouts whenever message fields C or D change.

For maintainability it would make sense if we could encapsulate all the
code which is common to all customers into one standard Flink library, and
separate out the bits which are different somewhere else. Let's call that
"somewhere else" a config file. The config file then becomes a complete
specification of the transform. We can version it, and do CI/CD etc. easily
- and any enhancement or bug-fixing on our core Flink library can be
inherited by all customers.

Part of the challenge is Java's strong typing - the messages flowing
through Flink always have a similar shape, but their exact fields will be
customer-specific. Java needs to know about this at compile time, which is
fine. And simple numeric parameters, such as e.g. timeout intervals, can be
pulled from a config file at runtime.

But consider the above example: for one customer I want to set a timer when
fields A & B in the message change. For another customer I want to set a
timer when fields C & D change.  Because the only way I can set a timer is
with code, this means I need to write some code to do this. Which means
that my configuration has to be code. Code is infinitely powerful, but
infinitely complicated and dangerous. It's not really ideal to have to use
code in a configuration.

Ideally, I'd like my config file to say something like this to define
timers:
T1=timeout(A)
T2=timeout(B)

I could solve this with meta-programming:
a) Either I could hold my messages in an abstract type (e.g. my messages
could just be Maps), though that's pretty inefficient and seems like I'm
bypassing a lot of the benefits of Java's type safety and speed.
b) Or I could take my specification and turn it into a Java class before I
compile my Flink job. But that seems a bit ugly?

Hope I've at least made clear the goal!

Thanks for any further thoughts,

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=a7811a72-90dd-4b91-fc61-4d7ff231851a>
 +44 7961 125282
See our latest features
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3Q_1QY1JxXxvW3SYLMH3T0vWRW1JxwY51LDhHjW3K1M0S1GFyF-3b732&si=5987503666495488&pi=a7811a72-90dd-4b91-fc61-4d7ff231851a>
and book me
<https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW3T1k3k1JxXxvW3SYLMH3T0vWRW1JxwY51LBcj1W4fJfX_4cgB3QW3ZW3jf3_qrT_4Wt5h1&si=5987503666495488&pi=a7811a72-90dd-4b91-fc61-4d7ff231851a>
for
a video call.



On Thu, 11 Feb 2021 at 11:03, Arvid Heise <ar...@apache.org> wrote:

> Hi Pilgrim,
>
> it sounds to me as if you are planning to use Flink for batch processing
> by having some centralized server to where you submit your queries.
>
> While you can use Flink SQL for that (and it's used in this way in larger
> companies), the original idea of Flink was to be used in streaming
> applications. For these applications, the query is not just configuration;
> the query IS the application. For these applications, you'd usually spawn a
> few K8s pods and let your application+Flink run. Hence, the many code
> interfaces.
>
> Even if you use SQL on streaming, you'd usually start a new ad-hoc cluster
> for your application to have better isolation. There are quite a few
> deployments that use YARN or Mesos to provide a large number of nodes which
> are then used by a large number of Flink batch and streaming jobs, but I'd
> usually not recommend that for newer users. I'd even go as far as to say
> that most of these organizations wouldn't use that stack if they'd create a
> cluster now and instead would also go for a K8s solution.
>
> On Tue, Feb 9, 2021 at 1:05 PM Yun Gao <yu...@aliyun.com> wrote:
>
>> Hi Pilgrim,
>>
>> Currently table indeed could not using low level api like timer, would a
>> mixture of sql & datastream
>> could satisfy the requirements? A job might be created via multiple sqls,
>> and connected via datastream
>> operations.
>>
>> Best,
>>  Yun
>>
>>
>> ------------------------------------------------------------------
>> Sender:Pilgrim Beart<pi...@devicepilot.com>
>> Date:2021/02/09 02:22:46
>> Recipient:<us...@flink.apache.org>
>> Theme:Any plans to make Flink configurable with pure data?
>>
>> To a naive Flink newcomer (me) it's a little surprising that there is no
>> pure "data" mechanism for specifying a Flink pipeline, only "code"
>> interfaces. With the DataStream interface I can use Java, Scala or Python
>> to set up a pipeline and then execute it - but that doesn't really seem to *need
>> *a programming model, it seems like configuration, which could be done
>> with data? OK, one does need occasionally to specify some custom code, e.g.
>> a ProcessFunction, but for any given use-case, a relatively static library
>> of such functions would seem fine.
>>
>> My use case is that I have lots of customers, and I'm doing a similar job
>> for each of them, so I'd prefer to have a library of common code (e.g.
>> ProcessFunctions), and then specify each customer's specific requirements
>> in a single config file.  To do that in Java, I'd have to do
>> metaprogramming (to build various pieces of Java out of that config file).
>>
>> Flink SQL seems to be the closest solution, but doesn't appear to support
>> fundamental Flink concepts such as timers (?). Is there a plan to evolve
>> Flink SQL to support timers? Timeouts is my specific need.
>>
>> Thanks,
>>
>> -Pilgrim
>> --
>> Learn more at https://devicepilot.com @devicepilot
>> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=527a1892-cb03-476e-bce3-95b7b9783178>
>>
>>
>>
>>

Re: Any plans to make Flink configurable with pure data?

Posted by Arvid Heise <ar...@apache.org>.
Hi Pilgrim,

it sounds to me as if you are planning to use Flink for batch processing by
having some centralized server to where you submit your queries.

While you can use Flink SQL for that (and it's used in this way in larger
companies), the original idea of Flink was to be used in streaming
applications. For these applications, the query is not just configuration;
the query IS the application. For these applications, you'd usually spawn a
few K8s pods and let your application+Flink run. Hence, the many code
interfaces.

Even if you use SQL on streaming, you'd usually start a new ad-hoc cluster
for your application to have better isolation. There are quite a few
deployments that use YARN or Mesos to provide a large number of nodes which
are then used by a large number of Flink batch and streaming jobs, but I'd
usually not recommend that for newer users. I'd even go as far as to say
that most of these organizations wouldn't use that stack if they'd create a
cluster now and instead would also go for a K8s solution.

On Tue, Feb 9, 2021 at 1:05 PM Yun Gao <yu...@aliyun.com> wrote:

> Hi Pilgrim,
>
> Currently table indeed could not using low level api like timer, would a
> mixture of sql & datastream
> could satisfy the requirements? A job might be created via multiple sqls,
> and connected via datastream
> operations.
>
> Best,
>  Yun
>
>
> ------------------------------------------------------------------
> Sender:Pilgrim Beart<pi...@devicepilot.com>
> Date:2021/02/09 02:22:46
> Recipient:<us...@flink.apache.org>
> Theme:Any plans to make Flink configurable with pure data?
>
> To a naive Flink newcomer (me) it's a little surprising that there is no
> pure "data" mechanism for specifying a Flink pipeline, only "code"
> interfaces. With the DataStream interface I can use Java, Scala or Python
> to set up a pipeline and then execute it - but that doesn't really seem to *need
> *a programming model, it seems like configuration, which could be done
> with data? OK, one does need occasionally to specify some custom code, e.g.
> a ProcessFunction, but for any given use-case, a relatively static library
> of such functions would seem fine.
>
> My use case is that I have lots of customers, and I'm doing a similar job
> for each of them, so I'd prefer to have a library of common code (e.g.
> ProcessFunctions), and then specify each customer's specific requirements
> in a single config file.  To do that in Java, I'd have to do
> metaprogramming (to build various pieces of Java out of that config file).
>
> Flink SQL seems to be the closest solution, but doesn't appear to support
> fundamental Flink concepts such as timers (?). Is there a plan to evolve
> Flink SQL to support timers? Timeouts is my specific need.
>
> Thanks,
>
> -Pilgrim
> --
> Learn more at https://devicepilot.com @devicepilot
> <https://t.sidekickopen70.com/s2t/c/5/f18dQhb0S7kv8cpgQZVc6VPt59hl3kW7_k2842PjkFxW2R1KhZ7v4vclW2Rxbb82bzNKzf7GYHvr01?te=W3R5hFj4cm2zwW4fQ47l4fGCmnW3Fbt5S3H4THtF3F6jFSWsSg1&si=5987503666495488&pi=527a1892-cb03-476e-bce3-95b7b9783178>
>
>
>
>

Re: Any plans to make Flink configurable with pure data?

Posted by Yun Gao <yu...@aliyun.com>.
Hi Pilgrim,

Currently table indeed could not using low level api like timer, would a mixture of sql & datastream
could satisfy the requirements? A job might be created via multiple sqls, and connected via datastream
operations.

Best,
 Yun

------------------------------------------------------------------
Sender:Pilgrim Beart<pi...@devicepilot.com>
Date:2021/02/09 02:22:46
Recipient:<us...@flink.apache.org>
Theme:Any plans to make Flink configurable with pure data?

To a naive Flink newcomer (me) it's a little surprising that there is no pure "data" mechanism for specifying a Flink pipeline, only "code" interfaces. With the DataStream interface I can use Java, Scala or Python to set up a pipeline and then execute it - but that doesn't really seem to need a programming model, it seems like configuration, which could be done with data? OK, one does need occasionally to specify some custom code, e.g. a ProcessFunction, but for any given use-case, a relatively static library of such functions would seem fine.

My use case is that I have lots of customers, and I'm doing a similar job for each of them, so I'd prefer to have a library of common code (e.g. ProcessFunctions), and then specify each customer's specific requirements in a single config file.  To do that in Java, I'd have to do metaprogramming (to build various pieces of Java out of that config file).

Flink SQL seems to be the closest solution, but doesn't appear to support fundamental Flink concepts such as timers (?). Is there a plan to evolve Flink SQL to support timers? Timeouts is my specific need.

Thanks,
-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot