You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marco Villalobos <mv...@kineteque.com> on 2020/08/04 20:34:02 UTC

Two Queries and a Kafka Topic

Lets say that I have:

SQL Query One from data in PostgreSQL (200K records).
SQL Query Two from data in PostgreSQL (1000 records).
and Kafka Topic One.

Let's also say that main data from this Flink job arrives in Kafka Topic One.

If I need SQL Query One and SQL Query Two to happen just one time, when the job starts up, and afterwards maybe store it in Keyed State or Broadcast State, but it's not really part of the stream, then what is the best practice for supporting that in Flink

The Flink job needs to stream data from Kafka Topic One, aggregate it, and perform computations that require all of the data in SQL Query One and SQL Query Two to perform its business logic.

I am using Flink 1.10.

I supposed to query the database before the Job I submitted, and then pass it on as parameters to a function?
Or am I supposed to use JDBCInputFormat for both queries and create two streams, and somehow connect or broadcast both of them two the main stream that uses Kafka Topic One?

I would appreciate guidance. Please.  Thank you.

Sincerely,

Marco A. Villalobos




Re: Two Queries and a Kafka Topic

Posted by Marco Villalobos <mv...@kineteque.com>.
HI Theo,

You've been very helpful actually, thank you.

What  mean by "tieing" DataSet and DataStream is that the documentation states that the DataSet can write a Save point, and the DataStream can read it, and another blog states that "You can create both Batch and Stream environment in a single job."

I think it is not possible for my use case, as state in Data-Stream job is passed on through command line parameters.

I did solve this problem though.  You can read it at https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream <https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream>

Again, thank you.

> On Aug 10, 2020, at 12:02 PM, Theo Diefenthal <th...@scoop-software.de> wrote:
> 
> Hi Marco,
> 
> Sadly, I myself haven't ever used StateProcessorAPI either. 
> 
> I thought, the documentation here [1] is rather straight forward to be used, but I never tried it myself.
> 
> Also, I don' get what you mean with "tieing" DataSet and DataStream? For what I understand, the StateProcessorAPI is internally upon DataSets at the moment. So if you have a Streaming job (DataStream), you would run your custom state migration program before even starting the streaming job using the StateProcessor API and initialize your state as a  DataSet. After all, initializing a state doesn't require you to have an infinite job running, but only a finite one (Thus batch/DataSet). Once that program finished execution, you would submit your streaming job starting from the written savepoint. I guess the API works with either HDFS or filesystem, but maybe someone who has already used the API might shed some more light for us here. 
> 
> Best regards
> Theo
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html>
> 
> Von: "Marco Villalobos" <mv...@kineteque.com>
> An: "Theo Diefenthal" <th...@scoop-software.de>
> CC: "user" <us...@flink.apache.org>
> Gesendet: Donnerstag, 6. August 2020 23:47:13
> Betreff: Re: Two Queries and a Kafka Topic
> 
> I am trying to use the State Processor API.  Does that require HDFS or a filesystem?
> I wish there was a complete example that ties in both DataSet and DataStream API, and the State Processor API.
> 
> So far I have not been able to get it to work.
> 
> Does anybody know where I can find examples of these type of techniques?
> 
> 
> 
> On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal <theo.diefenthal@scoop-software.de <ma...@scoop-software.de>> wrote:
> Hi Marco,
> 
> In general, I see three solutions here you could approach: 
> 
> 1. Use the StateProcessorAPI: You can run a program with the stateProcessorAPI that loads the data from JDBC and stores it into a Flink SavePoint. Afterwards, you start your streaming job from that savepoint which will load its state and within find all the data from JDBC stored already. 
> 2. Load from master, distribute with the job: When you build up your jobgraph, you could execute the JDBC queries and put the result into some Serializable class which in turn you plug in a an operator in your stream (e.g. a map stage). The class along with all the queried data will be serialized and deserialized on the taskmanagers (Usually, I use this for configuration parameters, but it might be ok in this case as well)
> 3. Load from TaskManager: In your map-function, if the very first event is received, you can block processing and synchronously load the data from JDBC (So each Taskmanager performs the JDBC query itself). You then keep the data to be used for all subsequent map steps. 
> 
> I think, option 3 is the easiest to be implemented while option 1 might be the most elegant way in my opinion. 
> 
> Best regards
> Theo
> 
> Von: "Marco Villalobos" <mvillalobos@kineteque.com <ma...@kineteque.com>>
> An: "Leonard Xu" <xbjtdcq@gmail.com <ma...@gmail.com>>
> CC: "user" <user@flink.apache.org <ma...@flink.apache.org>>
> Gesendet: Mittwoch, 5. August 2020 04:33:23
> Betreff: Re: Two Queries and a Kafka Topic
> 
> Hi Leonard,
> 
> First, Thank you.
> 
> I am currently trying to restrict my solution to Apache Flink 1.10 because its the current version supported by Amazon EMR.
> i am not ready to change our operational environment to solve this.
> 
> Second, I am using the DataStream API.  The Kafka Topic is not in a table, it is in a DataStream.
> 
> The SQL queries are literally from a PostgresSQL database, and only need to be run exactly once in the lifetime of the job.
> 
> I am struggling to determine where this happens.
> 
> JDBCInputFormat seems to query the SQL table repetitively, and also connecting streams and aggregating into one object is very complicated.
> 
> Thus, I am wondering what is the right approach.  
> 
> Let me restate the parameters.
> 
> SQL Query One = data in PostgreSQL (200K records) that is used for business logic.
> SQL Query Two = data in PostgreSQL (1000 records) that is used for business logic.
> Kafka Topic One = unlimited data-stream that uses the data-stream api and queries above to write into multiple sinks
> 
> Asci Diagram:
> 
> [SQL Query One] ----> [Aggregate to Map]											
> 
> 									Kafka ----> [Kafka Topic One]  --- [Keyed Process Function (Query One Map, Query Two Map)] ---<[Multiple Sinks] 
> 
> [SQL Query Two] ---->[Aggregate to Map]
> 
> 
> Maybe my graph above helps.  You see, I need Query One and Query Two only ever execute once.  After that the information they provide are used to correctly process the Kafka Topic.
> 
> I'll take a deep further to try and understand what you said, thank you, but JDBCInputFormat seem to repetitively query the database.  Maybe I need to write a RichFunction or AsyncIO function and cache the results in state after that.
> 
> 
> 
> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjtdcq@gmail.com <ma...@gmail.com>> wrote:
> 
> Hi, Marco
> 
> If I need SQL Query One and SQL Query Two to happen just one time,
> 
> Looks like you want to reuse this kafka table in one job, It’s supported to execute multiple query in one sql job in Flink 1.11. 
> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a single SQL job[1].
> 
> 
> Best
> Leonard 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement>
> 
> 
> 在 2020年8月5日,04:34,Marco Villalobos <mvillalobos@kineteque.com <ma...@kineteque.com>> 写道:
> 
> Lets say that I have:
> 
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
> 
> Let's also say that main data from this Flink job arrives in Kafka Topic One.
> 
> If I need SQL Query One and SQL Query Two to happen just one time, when the job starts up, and afterwards maybe store it in Keyed State or Broadcast State, but it's not really part of the stream, then what is the best practice for supporting that in Flink
> 
> The Flink job needs to stream data from Kafka Topic One, aggregate it, and perform computations that require all of the data in SQL Query One and SQL Query Two to perform its business logic.
> 
> I am using Flink 1.10.
> 
> I supposed to query the database before the Job I submitted, and then pass it on as parameters to a function?
> Or am I supposed to use JDBCInputFormat for both queries and create two streams, and somehow connect or broadcast both of them two the main stream that uses Kafka Topic One?
> 
> I would appreciate guidance. Please.  Thank you.
> 
> Sincerely,
> 
> Marco A. Villalobo


Re: Two Queries and a Kafka Topic

Posted by Austin Cawley-Edwards <au...@gmail.com>.
for each operator that needs the *data** 🤦‍♂️ *

On Mon, Aug 10, 2020 at 3:58 PM Austin Cawley-Edwards <
austin.cawley@gmail.com> wrote:

> Hey all,
>
> A bit late here and I’m not sure it’s totally valuable, but we have a
> similar job where we need to query an external data source on startup
> before processing the main stream as well.
>
> Instead of making that request in the Jobmanager process when building the
> graph, we make those requests from the operator “open()” methods, and then
> store it in broadcast state.
>
> Our queries aren’t that expensive to run, so we run multiple on startup
> for each operator that needs the fat to avoid a race condition. The
> blocking until the broadcast state is received downstream sounds like a
> reasonable way to do it to.
>
> Hope that helps a bit, or at least as another example!
>
> Best,
> Austin
>
> On Mon, Aug 10, 2020 at 3:03 PM Theo Diefenthal <
> theo.diefenthal@scoop-software.de> wrote:
>
>> Hi Marco,
>>
>> Sadly, I myself haven't ever used StateProcessorAPI either.
>>
>> I thought, the documentation here [1] is rather straight forward to be
>> used, but I never tried it myself.
>>
>> Also, I don' get what you mean with "tieing" DataSet and DataStream? For
>> what I understand, the StateProcessorAPI is internally upon DataSets at the
>> moment. So if you have a Streaming job (DataStream), you would run your
>> custom state migration program before even starting the streaming job using
>> the StateProcessor API and initialize your state as a  DataSet. After all,
>> initializing a state doesn't require you to have an infinite job running,
>> but only a finite one (Thus batch/DataSet). Once that program finished
>> execution, you would submit your streaming job starting from the written
>> savepoint. I guess the API works with either HDFS or filesystem, but maybe
>> someone who has already used the API might shed some more light for us
>> here.
>>
>> Best regards
>> Theo
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> ------------------------------
>> *Von: *"Marco Villalobos" <mv...@kineteque.com>
>> *An: *"Theo Diefenthal" <th...@scoop-software.de>
>> *CC: *"user" <us...@flink.apache.org>
>> *Gesendet: *Donnerstag, 6. August 2020 23:47:13
>>
>> *Betreff: *Re: Two Queries and a Kafka Topic
>>
>> I am trying to use the State Processor API.  Does that require HDFS or a
>> filesystem?
>> I wish there was a complete example that ties in both DataSet and
>> DataStream API, and the State Processor API.
>>
>> So far I have not been able to get it to work.
>>
>> Does anybody know where I can find examples of these type of techniques?
>>
>>
>>
>> On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal <
>> theo.diefenthal@scoop-software.de> wrote:
>>
>>> Hi Marco,
>>>
>>> In general, I see three solutions here you could approach:
>>>
>>> 1. Use the StateProcessorAPI: You can run a program with the
>>> stateProcessorAPI that loads the data from JDBC and stores it into a Flink
>>> SavePoint. Afterwards, you start your streaming job from that savepoint
>>> which will load its state and within find all the data from JDBC stored
>>> already.
>>> 2. Load from master, distribute with the job: When you build up your
>>> jobgraph, you could execute the JDBC queries and put the result into some
>>> Serializable class which in turn you plug in a an operator in your stream
>>> (e.g. a map stage). The class along with all the queried data will be
>>> serialized and deserialized on the taskmanagers (Usually, I use this for
>>> configuration parameters, but it might be ok in this case as well)
>>> 3. Load from TaskManager: In your map-function, if the very first event
>>> is received, you can block processing and synchronously load the data from
>>> JDBC (So each Taskmanager performs the JDBC query itself). You then keep
>>> the data to be used for all subsequent map steps.
>>>
>>> I think, option 3 is the easiest to be implemented while option 1 might
>>> be the most elegant way in my opinion.
>>>
>>> Best regards
>>> Theo
>>>
>>> ------------------------------
>>> *Von: *"Marco Villalobos" <mv...@kineteque.com>
>>> *An: *"Leonard Xu" <xb...@gmail.com>
>>> *CC: *"user" <us...@flink.apache.org>
>>> *Gesendet: *Mittwoch, 5. August 2020 04:33:23
>>> *Betreff: *Re: Two Queries and a Kafka Topic
>>>
>>> Hi Leonard,
>>>
>>> First, Thank you.
>>>
>>> I am currently trying to restrict my solution to Apache Flink 1.10
>>> because its the current version supported by Amazon EMR.
>>> i am not ready to change our operational environment to solve this.
>>>
>>> Second, I am using the DataStream API.  The Kafka Topic is not in a
>>> table, it is in a DataStream.
>>>
>>> The SQL queries are literally from a PostgresSQL database, and only need
>>> to be run exactly once in the lifetime of the job.
>>>
>>> I am struggling to determine where this happens.
>>>
>>> JDBCInputFormat seems to query the SQL table repetitively, and also
>>> connecting streams and aggregating into one object is very complicated.
>>>
>>> Thus, I am wondering what is the right approach.
>>>
>>> Let me restate the parameters.
>>>
>>> SQL Query One = data in PostgreSQL (200K records) that is used for
>>> business logic.
>>> SQL Query Two = data in PostgreSQL (1000 records) that is used for
>>> business logic.
>>> Kafka Topic One = unlimited data-stream that uses the data-stream api
>>> and queries above to write into multiple sinks
>>>
>>> Asci Diagram:
>>>
>>> [SQL Query One] ----> [Aggregate to Map]
>>>
>>> Kafka ----> [Kafka Topic One]  --- [Keyed Process Function (Query One
>>> Map, Query Two Map)] ---<[Multiple Sinks]
>>>
>>> [SQL Query Two] ---->[Aggregate to Map]
>>>
>>>
>>> Maybe my graph above helps.  You see, I need Query One and Query Two
>>> only ever execute once.  After that the information they provide are used
>>> to correctly process the Kafka Topic.
>>>
>>> I'll take a deep further to try and understand what you said, thank you,
>>> but JDBCInputFormat seem to repetitively query the database.  Maybe I need
>>> to write a RichFunction or AsyncIO function and cache the results in state
>>> after that.
>>>
>>>
>>>
>>> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xb...@gmail.com> wrote:
>>>
>>> Hi, Marco
>>>
>>> If I need SQL Query One and SQL Query Two to happen just one time,
>>>
>>>
>>> Looks like you want to reuse this kafka table in one job, It’s supported
>>> to execute multiple query in one sql job in Flink 1.11.
>>> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in
>>> a single SQL job[1].
>>>
>>>
>>> Best
>>> Leonard
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
>>>
>>>
>>> 在 2020年8月5日,04:34,Marco Villalobos <mv...@kineteque.com> 写道:
>>>
>>> Lets say that I have:
>>>
>>> SQL Query One from data in PostgreSQL (200K records).
>>> SQL Query Two from data in PostgreSQL (1000 records).
>>> and Kafka Topic One.
>>>
>>> Let's also say that main data from this Flink job arrives in Kafka Topic
>>> One.
>>>
>>> If I need SQL Query One and SQL Query Two to happen just one time, when
>>> the job starts up, and afterwards maybe store it in Keyed State or
>>> Broadcast State, but it's not really part of the stream, then what is the
>>> best practice for supporting that in Flink
>>>
>>> The Flink job needs to stream data from Kafka Topic One, aggregate it,
>>> and perform computations that require all of the data in SQL Query One and
>>> SQL Query Two to perform its business logic.
>>>
>>> I am using Flink 1.10.
>>>
>>> I supposed to query the database before the Job I submitted, and then
>>> pass it on as parameters to a function?
>>> Or am I supposed to use JDBCInputFormat for both queries and create two
>>> streams, and somehow connect or broadcast both of them two the main stream
>>> that uses Kafka Topic One?
>>>
>>> I would appreciate guidance. Please.  Thank you.
>>>
>>> Sincerely,
>>>
>>> Marco A. Villalobo
>>>
>>>

Re: Two Queries and a Kafka Topic

Posted by Austin Cawley-Edwards <au...@gmail.com>.
Hey all,

A bit late here and I’m not sure it’s totally valuable, but we have a
similar job where we need to query an external data source on startup
before processing the main stream as well.

Instead of making that request in the Jobmanager process when building the
graph, we make those requests from the operator “open()” methods, and then
store it in broadcast state.

Our queries aren’t that expensive to run, so we run multiple on startup for
each operator that needs the fat to avoid a race condition. The blocking
until the broadcast state is received downstream sounds like a reasonable
way to do it to.

Hope that helps a bit, or at least as another example!

Best,
Austin

On Mon, Aug 10, 2020 at 3:03 PM Theo Diefenthal <
theo.diefenthal@scoop-software.de> wrote:

> Hi Marco,
>
> Sadly, I myself haven't ever used StateProcessorAPI either.
>
> I thought, the documentation here [1] is rather straight forward to be
> used, but I never tried it myself.
>
> Also, I don' get what you mean with "tieing" DataSet and DataStream? For
> what I understand, the StateProcessorAPI is internally upon DataSets at the
> moment. So if you have a Streaming job (DataStream), you would run your
> custom state migration program before even starting the streaming job using
> the StateProcessor API and initialize your state as a  DataSet. After all,
> initializing a state doesn't require you to have an infinite job running,
> but only a finite one (Thus batch/DataSet). Once that program finished
> execution, you would submit your streaming job starting from the written
> savepoint. I guess the API works with either HDFS or filesystem, but maybe
> someone who has already used the API might shed some more light for us
> here.
>
> Best regards
> Theo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> ------------------------------
> *Von: *"Marco Villalobos" <mv...@kineteque.com>
> *An: *"Theo Diefenthal" <th...@scoop-software.de>
> *CC: *"user" <us...@flink.apache.org>
> *Gesendet: *Donnerstag, 6. August 2020 23:47:13
>
> *Betreff: *Re: Two Queries and a Kafka Topic
>
> I am trying to use the State Processor API.  Does that require HDFS or a
> filesystem?
> I wish there was a complete example that ties in both DataSet and
> DataStream API, and the State Processor API.
>
> So far I have not been able to get it to work.
>
> Does anybody know where I can find examples of these type of techniques?
>
>
>
> On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal <
> theo.diefenthal@scoop-software.de> wrote:
>
>> Hi Marco,
>>
>> In general, I see three solutions here you could approach:
>>
>> 1. Use the StateProcessorAPI: You can run a program with the
>> stateProcessorAPI that loads the data from JDBC and stores it into a Flink
>> SavePoint. Afterwards, you start your streaming job from that savepoint
>> which will load its state and within find all the data from JDBC stored
>> already.
>> 2. Load from master, distribute with the job: When you build up your
>> jobgraph, you could execute the JDBC queries and put the result into some
>> Serializable class which in turn you plug in a an operator in your stream
>> (e.g. a map stage). The class along with all the queried data will be
>> serialized and deserialized on the taskmanagers (Usually, I use this for
>> configuration parameters, but it might be ok in this case as well)
>> 3. Load from TaskManager: In your map-function, if the very first event
>> is received, you can block processing and synchronously load the data from
>> JDBC (So each Taskmanager performs the JDBC query itself). You then keep
>> the data to be used for all subsequent map steps.
>>
>> I think, option 3 is the easiest to be implemented while option 1 might
>> be the most elegant way in my opinion.
>>
>> Best regards
>> Theo
>>
>> ------------------------------
>> *Von: *"Marco Villalobos" <mv...@kineteque.com>
>> *An: *"Leonard Xu" <xb...@gmail.com>
>> *CC: *"user" <us...@flink.apache.org>
>> *Gesendet: *Mittwoch, 5. August 2020 04:33:23
>> *Betreff: *Re: Two Queries and a Kafka Topic
>>
>> Hi Leonard,
>>
>> First, Thank you.
>>
>> I am currently trying to restrict my solution to Apache Flink 1.10
>> because its the current version supported by Amazon EMR.
>> i am not ready to change our operational environment to solve this.
>>
>> Second, I am using the DataStream API.  The Kafka Topic is not in a
>> table, it is in a DataStream.
>>
>> The SQL queries are literally from a PostgresSQL database, and only need
>> to be run exactly once in the lifetime of the job.
>>
>> I am struggling to determine where this happens.
>>
>> JDBCInputFormat seems to query the SQL table repetitively, and also
>> connecting streams and aggregating into one object is very complicated.
>>
>> Thus, I am wondering what is the right approach.
>>
>> Let me restate the parameters.
>>
>> SQL Query One = data in PostgreSQL (200K records) that is used for
>> business logic.
>> SQL Query Two = data in PostgreSQL (1000 records) that is used for
>> business logic.
>> Kafka Topic One = unlimited data-stream that uses the data-stream api and
>> queries above to write into multiple sinks
>>
>> Asci Diagram:
>>
>> [SQL Query One] ----> [Aggregate to Map]
>>
>> Kafka ----> [Kafka Topic One]  --- [Keyed Process Function (Query One
>> Map, Query Two Map)] ---<[Multiple Sinks]
>>
>> [SQL Query Two] ---->[Aggregate to Map]
>>
>>
>> Maybe my graph above helps.  You see, I need Query One and Query Two only
>> ever execute once.  After that the information they provide are used to
>> correctly process the Kafka Topic.
>>
>> I'll take a deep further to try and understand what you said, thank you,
>> but JDBCInputFormat seem to repetitively query the database.  Maybe I need
>> to write a RichFunction or AsyncIO function and cache the results in state
>> after that.
>>
>>
>>
>> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xb...@gmail.com> wrote:
>>
>> Hi, Marco
>>
>> If I need SQL Query One and SQL Query Two to happen just one time,
>>
>>
>> Looks like you want to reuse this kafka table in one job, It’s supported
>> to execute multiple query in one sql job in Flink 1.11.
>> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a
>> single SQL job[1].
>>
>>
>> Best
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
>>
>>
>> 在 2020年8月5日,04:34,Marco Villalobos <mv...@kineteque.com> 写道:
>>
>> Lets say that I have:
>>
>> SQL Query One from data in PostgreSQL (200K records).
>> SQL Query Two from data in PostgreSQL (1000 records).
>> and Kafka Topic One.
>>
>> Let's also say that main data from this Flink job arrives in Kafka Topic
>> One.
>>
>> If I need SQL Query One and SQL Query Two to happen just one time, when
>> the job starts up, and afterwards maybe store it in Keyed State or
>> Broadcast State, but it's not really part of the stream, then what is the
>> best practice for supporting that in Flink
>>
>> The Flink job needs to stream data from Kafka Topic One, aggregate it,
>> and perform computations that require all of the data in SQL Query One and
>> SQL Query Two to perform its business logic.
>>
>> I am using Flink 1.10.
>>
>> I supposed to query the database before the Job I submitted, and then
>> pass it on as parameters to a function?
>> Or am I supposed to use JDBCInputFormat for both queries and create two
>> streams, and somehow connect or broadcast both of them two the main stream
>> that uses Kafka Topic One?
>>
>> I would appreciate guidance. Please.  Thank you.
>>
>> Sincerely,
>>
>> Marco A. Villalobo
>>
>>

Re: Two Queries and a Kafka Topic

Posted by Theo Diefenthal <th...@scoop-software.de>.
Hi Marco, 

Sadly, I myself haven't ever used StateProcessorAPI either. 

I thought, the documentation here [1] is rather straight forward to be used, but I never tried it myself. 

Also, I don' get what you mean with "tieing" DataSet and DataStream? For what I understand, the StateProcessorAPI is internally upon DataSets at the moment. So if you have a Streaming job (DataStream), you would run your custom state migration program before even starting the streaming job using the StateProcessor API and initialize your state as a DataSet. After all, initializing a state doesn't require you to have an infinite job running, but only a finite one (Thus batch/DataSet). Once that program finished execution, you would submit your streaming job starting from the written savepoint. I guess the API works with either HDFS or filesystem, but maybe someone who has already used the API might shed some more light for us here. 

Best regards 
Theo 

[1] [ https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html | https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html ] 


Von: "Marco Villalobos" <mv...@kineteque.com> 
An: "Theo Diefenthal" <th...@scoop-software.de> 
CC: "user" <us...@flink.apache.org> 
Gesendet: Donnerstag, 6. August 2020 23:47:13 
Betreff: Re: Two Queries and a Kafka Topic 

I am trying to use the State Processor API. Does that require HDFS or a filesystem? 
I wish there was a complete example that ties in both DataSet and DataStream API, and the State Processor API. 

So far I have not been able to get it to work. 

Does anybody know where I can find examples of these type of techniques? 



On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal < [ mailto:theo.diefenthal@scoop-software.de | theo.diefenthal@scoop-software.de ] > wrote: 



Hi Marco, 

In general, I see three solutions here you could approach: 

1. Use the StateProcessorAPI: You can run a program with the stateProcessorAPI that loads the data from JDBC and stores it into a Flink SavePoint. Afterwards, you start your streaming job from that savepoint which will load its state and within find all the data from JDBC stored already. 
2. Load from master, distribute with the job: When you build up your jobgraph, you could execute the JDBC queries and put the result into some Serializable class which in turn you plug in a an operator in your stream (e.g. a map stage). The class along with all the queried data will be serialized and deserialized on the taskmanagers (Usually, I use this for configuration parameters, but it might be ok in this case as well) 
3. Load from TaskManager: In your map-function, if the very first event is received, you can block processing and synchronously load the data from JDBC (So each Taskmanager performs the JDBC query itself). You then keep the data to be used for all subsequent map steps. 

I think, option 3 is the easiest to be implemented while option 1 might be the most elegant way in my opinion. 

Best regards 
Theo 


Von: "Marco Villalobos" < [ mailto:mvillalobos@kineteque.com | mvillalobos@kineteque.com ] > 
An: "Leonard Xu" < [ mailto:xbjtdcq@gmail.com | xbjtdcq@gmail.com ] > 
CC: "user" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
Gesendet: Mittwoch, 5. August 2020 04:33:23 
Betreff: Re: Two Queries and a Kafka Topic 

Hi Leonard, 

First, Thank you. 

I am currently trying to restrict my solution to Apache Flink 1.10 because its the current version supported by Amazon EMR. 
i am not ready to change our operational environment to solve this. 

Second, I am using the DataStream API. The Kafka Topic is not in a table, it is in a DataStream. 

The SQL queries are literally from a PostgresSQL database, and only need to be run exactly once in the lifetime of the job. 

I am struggling to determine where this happens. 

JDBCInputFormat seems to query the SQL table repetitively, and also connecting streams and aggregating into one object is very complicated. 

Thus, I am wondering what is the right approach. 

Let me restate the parameters. 

SQL Query One = data in PostgreSQL (200K records) that is used for business logic. 
SQL Query Two = data in PostgreSQL (1000 records) that is used for business logic. 
Kafka Topic One = unlimited data-stream that uses the data-stream api and queries above to write into multiple sinks 

Asci Diagram: 

[ SQL Query One] ----> [Aggregate to Map] 

Kafka ----> [Kafka Topic One] --- [Keyed Process Function (Query One Map, Query Two Map)] ---<[Multiple Sinks] 

[ SQL Query Two] ----> [Aggregate to Map] 


Maybe my graph above helps. You see, I need Query One and Query Two only ever execute once. After that the information they provide are used to correctly process the Kafka Topic. 

I'll take a deep further to try and understand what you said, thank you, but JDBCInputFormat seem to repetitively query the database. Maybe I need to write a RichFunction or AsyncIO function and cache the results in state after that. 




BQ_BEGIN

On Aug 4, 2020, at 6:25 PM, Leonard Xu < [ mailto:xbjtdcq@gmail.com | xbjtdcq@gmail.com ] > wrote: 

Hi, Marco 


BQ_BEGIN

If I need SQL Query One and SQL Query Two to happen just one time, 



Looks like you want to reuse this kafka table in one job, It’s supported to execute multiple query in one sql job in Flink 1.11. 
You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a single SQL job[1]. 


Best 
Leonard 
[1] [ https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement | https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement ] 



BQ_BEGIN

在 2020年8月5日,04:34,Marco Villalobos < [ mailto:mvillalobos@kineteque.com | mvillalobos@kineteque.com ] > 写道: 

Lets say that I have: 

SQL Query One from data in PostgreSQL (200K records). 
SQL Query Two from data in PostgreSQL (1000 records). 
and Kafka Topic One. 

Let's also say that main data from this Flink job arrives in Kafka Topic One. 

If I need SQL Query One and SQL Query Two to happen just one time, when the job starts up, and afterwards maybe store it in Keyed State or Broadcast State, but it's not really part of the stream, then what is the best practice for supporting that in Flink 

The Flink job needs to stream data from Kafka Topic One, aggregate it, and perform computations that require all of the data in SQL Query One and SQL Query Two to perform its business logic. 

I am using Flink 1.10. 

I supposed to query the database before the Job I submitted, and then pass it on as parameters to a function? 
Or am I supposed to use JDBCInputFormat for both queries and create two streams, and somehow connect or broadcast both of them two the main stream that uses Kafka Topic One? 

I would appreciate guidance. Please. Thank you. 

Sincerely, 

Marco A. Villalobo 

BQ_END


BQ_END


BQ_END


Re: Two Queries and a Kafka Topic

Posted by Marco Villalobos <mv...@kineteque.com>.
I am trying to use the State Processor API.  Does that require HDFS or a
filesystem?

I wish there was a complete example that ties in both DataSet and
DataStream API, and the State Processor API.

So far I have not been able to get it to work.

Does anybody know where I can find examples of these type of techniques?



On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal <
theo.diefenthal@scoop-software.de> wrote:

> Hi Marco,
>
> In general, I see three solutions here you could approach:
>
> 1. Use the StateProcessorAPI: You can run a program with the
> stateProcessorAPI that loads the data from JDBC and stores it into a Flink
> SavePoint. Afterwards, you start your streaming job from that savepoint
> which will load its state and within find all the data from JDBC stored
> already.
> 2. Load from master, distribute with the job: When you build up your
> jobgraph, you could execute the JDBC queries and put the result into some
> Serializable class which in turn you plug in a an operator in your stream
> (e.g. a map stage). The class along with all the queried data will be
> serialized and deserialized on the taskmanagers (Usually, I use this for
> configuration parameters, but it might be ok in this case as well)
> 3. Load from TaskManager: In your map-function, if the very first event is
> received, you can block processing and synchronously load the data from
> JDBC (So each Taskmanager performs the JDBC query itself). You then keep
> the data to be used for all subsequent map steps.
>
> I think, option 3 is the easiest to be implemented while option 1 might be
> the most elegant way in my opinion.
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Marco Villalobos" <mv...@kineteque.com>
> *An: *"Leonard Xu" <xb...@gmail.com>
> *CC: *"user" <us...@flink.apache.org>
> *Gesendet: *Mittwoch, 5. August 2020 04:33:23
> *Betreff: *Re: Two Queries and a Kafka Topic
>
> Hi Leonard,
>
> First, Thank you.
>
> I am currently trying to restrict my solution to Apache Flink 1.10 because
> its the current version supported by Amazon EMR.
> i am not ready to change our operational environment to solve this.
>
> Second, I am using the DataStream API.  The Kafka Topic is not in a table,
> it is in a DataStream.
>
> The SQL queries are literally from a PostgresSQL database, and only need
> to be run exactly once in the lifetime of the job.
>
> I am struggling to determine where this happens.
>
> JDBCInputFormat seems to query the SQL table repetitively, and also
> connecting streams and aggregating into one object is very complicated.
>
> Thus, I am wondering what is the right approach.
>
> Let me restate the parameters.
>
> SQL Query One = data in PostgreSQL (200K records) that is used for
> business logic.
> SQL Query Two = data in PostgreSQL (1000 records) that is used for
> business logic.
> Kafka Topic One = unlimited data-stream that uses the data-stream api and
> queries above to write into multiple sinks
>
> Asci Diagram:
>
> [SQL Query One] ----> [Aggregate to Map]
>
> Kafka ----> [Kafka Topic One]  --- [Keyed Process Function (Query One Map,
> Query Two Map)] ---<[Multiple Sinks]
>
> [SQL Query Two] ---->[Aggregate to Map]
>
>
> Maybe my graph above helps.  You see, I need Query One and Query Two only
> ever execute once.  After that the information they provide are used to
> correctly process the Kafka Topic.
>
> I'll take a deep further to try and understand what you said, thank you,
> but JDBCInputFormat seem to repetitively query the database.  Maybe I need
> to write a RichFunction or AsyncIO function and cache the results in state
> after that.
>
>
>
> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xb...@gmail.com> wrote:
>
> Hi, Marco
>
> If I need SQL Query One and SQL Query Two to happen just one time,
>
>
> Looks like you want to reuse this kafka table in one job, It’s supported
> to execute multiple query in one sql job in Flink 1.11.
> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a
> single SQL job[1].
>
>
> Best
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
>
>
> 在 2020年8月5日,04:34,Marco Villalobos <mv...@kineteque.com> 写道:
>
> Lets say that I have:
>
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
>
> Let's also say that main data from this Flink job arrives in Kafka Topic
> One.
>
> If I need SQL Query One and SQL Query Two to happen just one time, when
> the job starts up, and afterwards maybe store it in Keyed State or
> Broadcast State, but it's not really part of the stream, then what is the
> best practice for supporting that in Flink
>
> The Flink job needs to stream data from Kafka Topic One, aggregate it, and
> perform computations that require all of the data in SQL Query One and SQL
> Query Two to perform its business logic.
>
> I am using Flink 1.10.
>
> I supposed to query the database before the Job I submitted, and then pass
> it on as parameters to a function?
> Or am I supposed to use JDBCInputFormat for both queries and create two
> streams, and somehow connect or broadcast both of them two the main stream
> that uses Kafka Topic One?
>
> I would appreciate guidance. Please.  Thank you.
>
> Sincerely,
>
> Marco A. Villalobos
>
>

Re: Two Queries and a Kafka Topic

Posted by Marco Villalobos <mv...@kineteque.com>.
Hi Theo,

Thank you.

I just read the State Processor API in an effort to understand Option 1, it seems though I can just use a KeyedProcessFunction that loads the data just once (maybe on the "open" method), and serialize the values into MapState and use it from that point on.

Another option in documentation are CheckpointedFunction types, which were not clear in the documentation to me on how to use.

My data shares a common key, so this might be doable in KeyedProcessFunction.

Is that what you're suggesting?

Again, Thank you.

Marco A, Villalobos


> On Aug 5, 2020, at 3:52 AM, Theo Diefenthal <th...@scoop-software.de> wrote:
> 
> Hi Marco,
> 
> In general, I see three solutions here you could approach: 
> 
> 1. Use the StateProcessorAPI: You can run a program with the stateProcessorAPI that loads the data from JDBC and stores it into a Flink SavePoint. Afterwards, you start your streaming job from that savepoint which will load its state and within find all the data from JDBC stored already. 
> 2. Load from master, distribute with the job: When you build up your jobgraph, you could execute the JDBC queries and put the result into some Serializable class which in turn you plug in a an operator in your stream (e.g. a map stage). The class along with all the queried data will be serialized and deserialized on the taskmanagers (Usually, I use this for configuration parameters, but it might be ok in this case as well)
> 3. Load from TaskManager: In your map-function, if the very first event is received, you can block processing and synchronously load the data from JDBC (So each Taskmanager performs the JDBC query itself). You then keep the data to be used for all subsequent map steps. 
> 
> I think, option 3 is the easiest to be implemented while option 1 might be the most elegant way in my opinion. 
> 
> Best regards
> Theo
> 
> Von: "Marco Villalobos" <mv...@kineteque.com>
> An: "Leonard Xu" <xb...@gmail.com>
> CC: "user" <us...@flink.apache.org>
> Gesendet: Mittwoch, 5. August 2020 04:33:23
> Betreff: Re: Two Queries and a Kafka Topic
> 
> Hi Leonard,
> 
> First, Thank you.
> 
> I am currently trying to restrict my solution to Apache Flink 1.10 because its the current version supported by Amazon EMR.
> i am not ready to change our operational environment to solve this.
> 
> Second, I am using the DataStream API.  The Kafka Topic is not in a table, it is in a DataStream.
> 
> The SQL queries are literally from a PostgresSQL database, and only need to be run exactly once in the lifetime of the job.
> 
> I am struggling to determine where this happens.
> 
> JDBCInputFormat seems to query the SQL table repetitively, and also connecting streams and aggregating into one object is very complicated.
> 
> Thus, I am wondering what is the right approach.  
> 
> Let me restate the parameters.
> 
> SQL Query One = data in PostgreSQL (200K records) that is used for business logic.
> SQL Query Two = data in PostgreSQL (1000 records) that is used for business logic.
> Kafka Topic One = unlimited data-stream that uses the data-stream api and queries above to write into multiple sinks
> 
> Asci Diagram:
> 
> [SQL Query One] ----> [Aggregate to Map]											
> 
> 									Kafka ----> [Kafka Topic One]  --- [Keyed Process Function (Query One Map, Query Two Map)] ---<[Multiple Sinks] 
> 
> [SQL Query Two] ---->[Aggregate to Map]
> 
> 
> Maybe my graph above helps.  You see, I need Query One and Query Two only ever execute once.  After that the information they provide are used to correctly process the Kafka Topic.
> 
> I'll take a deep further to try and understand what you said, thank you, but JDBCInputFormat seem to repetitively query the database.  Maybe I need to write a RichFunction or AsyncIO function and cache the results in state after that.
> 
> 
> 
> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjtdcq@gmail.com <ma...@gmail.com>> wrote:
> 
> Hi, Marco
> 
> If I need SQL Query One and SQL Query Two to happen just one time,
> 
> Looks like you want to reuse this kafka table in one job, It’s supported to execute multiple query in one sql job in Flink 1.11. 
> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a single SQL job[1].
> 
> 
> Best
> Leonard 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement>
> 
> 
> 在 2020年8月5日,04:34,Marco Villalobos <mvillalobos@kineteque.com <ma...@kineteque.com>> 写道:
> 
> Lets say that I have:
> 
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
> 
> Let's also say that main data from this Flink job arrives in Kafka Topic One.
> 
> If I need SQL Query One and SQL Query Two to happen just one time, when the job starts up, and afterwards maybe store it in Keyed State or Broadcast State, but it's not really part of the stream, then what is the best practice for supporting that in Flink
> 
> The Flink job needs to stream data from Kafka Topic One, aggregate it, and perform computations that require all of the data in SQL Query One and SQL Query Two to perform its business logic.
> 
> I am using Flink 1.10.
> 
> I supposed to query the database before the Job I submitted, and then pass it on as parameters to a function?
> Or am I supposed to use JDBCInputFormat for both queries and create two streams, and somehow connect or broadcast both of them two the main stream that uses Kafka Topic One?
> 
> I would appreciate guidance. Please.  Thank you.
> 
> Sincerely,
> 
> Marco A. Villalobos


Re: Two Queries and a Kafka Topic

Posted by Theo Diefenthal <th...@scoop-software.de>.
Hi Marco, 

In general, I see three solutions here you could approach: 

1. Use the StateProcessorAPI: You can run a program with the stateProcessorAPI that loads the data from JDBC and stores it into a Flink SavePoint. Afterwards, you start your streaming job from that savepoint which will load its state and within find all the data from JDBC stored already. 
2. Load from master, distribute with the job: When you build up your jobgraph, you could execute the JDBC queries and put the result into some Serializable class which in turn you plug in a an operator in your stream (e.g. a map stage). The class along with all the queried data will be serialized and deserialized on the taskmanagers (Usually, I use this for configuration parameters, but it might be ok in this case as well) 
3. Load from TaskManager: In your map-function, if the very first event is received, you can block processing and synchronously load the data from JDBC (So each Taskmanager performs the JDBC query itself). You then keep the data to be used for all subsequent map steps. 

I think, option 3 is the easiest to be implemented while option 1 might be the most elegant way in my opinion. 

Best regards 
Theo 


Von: "Marco Villalobos" <mv...@kineteque.com> 
An: "Leonard Xu" <xb...@gmail.com> 
CC: "user" <us...@flink.apache.org> 
Gesendet: Mittwoch, 5. August 2020 04:33:23 
Betreff: Re: Two Queries and a Kafka Topic 

Hi Leonard, 

First, Thank you. 

I am currently trying to restrict my solution to Apache Flink 1.10 because its the current version supported by Amazon EMR. 
i am not ready to change our operational environment to solve this. 

Second, I am using the DataStream API. The Kafka Topic is not in a table, it is in a DataStream. 

The SQL queries are literally from a PostgresSQL database, and only need to be run exactly once in the lifetime of the job. 

I am struggling to determine where this happens. 

JDBCInputFormat seems to query the SQL table repetitively, and also connecting streams and aggregating into one object is very complicated. 

Thus, I am wondering what is the right approach. 

Let me restate the parameters. 

SQL Query One = data in PostgreSQL (200K records) that is used for business logic. 
SQL Query Two = data in PostgreSQL (1000 records) that is used for business logic. 
Kafka Topic One = unlimited data-stream that uses the data-stream api and queries above to write into multiple sinks 

Asci Diagram: 

[ SQL Query One] ----> [Aggregate to Map] 

Kafka ----> [Kafka Topic One] --- [Keyed Process Function (Query One Map, Query Two Map)] ---<[Multiple Sinks] 

[ SQL Query Two] ----> [Aggregate to Map] 


Maybe my graph above helps. You see, I need Query One and Query Two only ever execute once. After that the information they provide are used to correctly process the Kafka Topic. 

I'll take a deep further to try and understand what you said, thank you, but JDBCInputFormat seem to repetitively query the database. Maybe I need to write a RichFunction or AsyncIO function and cache the results in state after that. 






On Aug 4, 2020, at 6:25 PM, Leonard Xu < [ mailto:xbjtdcq@gmail.com | xbjtdcq@gmail.com ] > wrote: 

Hi, Marco 


BQ_BEGIN

If I need SQL Query One and SQL Query Two to happen just one time, 



Looks like you want to reuse this kafka table in one job, It’s supported to execute multiple query in one sql job in Flink 1.11. 
You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a single SQL job[1]. 


Best 
Leonard 
[1] [ https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement | https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement ] 



BQ_BEGIN

在 2020年8月5日,04:34,Marco Villalobos < [ mailto:mvillalobos@kineteque.com | mvillalobos@kineteque.com ] > 写道: 

Lets say that I have: 

SQL Query One from data in PostgreSQL (200K records). 
SQL Query Two from data in PostgreSQL (1000 records). 
and Kafka Topic One. 

Let's also say that main data from this Flink job arrives in Kafka Topic One. 

If I need SQL Query One and SQL Query Two to happen just one time, when the job starts up, and afterwards maybe store it in Keyed State or Broadcast State, but it's not really part of the stream, then what is the best practice for supporting that in Flink 

The Flink job needs to stream data from Kafka Topic One, aggregate it, and perform computations that require all of the data in SQL Query One and SQL Query Two to perform its business logic. 

I am using Flink 1.10. 

I supposed to query the database before the Job I submitted, and then pass it on as parameters to a function? 
Or am I supposed to use JDBCInputFormat for both queries and create two streams, and somehow connect or broadcast both of them two the main stream that uses Kafka Topic One? 

I would appreciate guidance. Please. Thank you. 

Sincerely, 

Marco A. Villalobos 

BQ_END


BQ_END


Re: Two Queries and a Kafka Topic

Posted by Marco Villalobos <mv...@kineteque.com>.
Hi Leonard,

First, Thank you.

I am currently trying to restrict my solution to Apache Flink 1.10 because its the current version supported by Amazon EMR.
i am not ready to change our operational environment to solve this.

Second, I am using the DataStream API.  The Kafka Topic is not in a table, it is in a DataStream.

The SQL queries are literally from a PostgresSQL database, and only need to be run exactly once in the lifetime of the job.

I am struggling to determine where this happens.

JDBCInputFormat seems to query the SQL table repetitively, and also connecting streams and aggregating into one object is very complicated.

Thus, I am wondering what is the right approach.  

Let me restate the parameters.

SQL Query One = data in PostgreSQL (200K records) that is used for business logic.
SQL Query Two = data in PostgreSQL (1000 records) that is used for business logic.
Kafka Topic One = unlimited data-stream that uses the data-stream api and queries above to write into multiple sinks

Asci Diagram:

[SQL Query One] ----> [Aggregate to Map]											

									Kafka ----> [Kafka Topic One]  --- [Keyed Process Function (Query One Map, Query Two Map)] ---<[Multiple Sinks] 

[SQL Query Two] ---->[Aggregate to Map]


Maybe my graph above helps.  You see, I need Query One and Query Two only ever execute once.  After that the information they provide are used to correctly process the Kafka Topic.

I'll take a deep further to try and understand what you said, thank you, but JDBCInputFormat seem to repetitively query the database.  Maybe I need to write a RichFunction or AsyncIO function and cache the results in state after that.



> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xb...@gmail.com> wrote:
> 
> Hi, Marco
> 
>> If I need SQL Query One and SQL Query Two to happen just one time,
> 
> Looks like you want to reuse this kafka table in one job, It’s supported to execute multiple query in one sql job in Flink 1.11. 
> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a single SQL job[1].
> 
> 
> Best
> Leonard 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement>
> 
> 
>> 在 2020年8月5日,04:34,Marco Villalobos <mvillalobos@kineteque.com <ma...@kineteque.com>> 写道:
>> 
>> Lets say that I have:
>> 
>> SQL Query One from data in PostgreSQL (200K records).
>> SQL Query Two from data in PostgreSQL (1000 records).
>> and Kafka Topic One.
>> 
>> Let's also say that main data from this Flink job arrives in Kafka Topic One.
>> 
>> If I need SQL Query One and SQL Query Two to happen just one time, when the job starts up, and afterwards maybe store it in Keyed State or Broadcast State, but it's not really part of the stream, then what is the best practice for supporting that in Flink
>> 
>> The Flink job needs to stream data from Kafka Topic One, aggregate it, and perform computations that require all of the data in SQL Query One and SQL Query Two to perform its business logic.
>> 
>> I am using Flink 1.10.
>> 
>> I supposed to query the database before the Job I submitted, and then pass it on as parameters to a function?
>> Or am I supposed to use JDBCInputFormat for both queries and create two streams, and somehow connect or broadcast both of them two the main stream that uses Kafka Topic One?
>> 
>> I would appreciate guidance. Please.  Thank you.
>> 
>> Sincerely,
>> 
>> Marco A. Villalobos
>> 
>> 
>> 
> 


Re: Two Queries and a Kafka Topic

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Marco

> If I need SQL Query One and SQL Query Two to happen just one time,

Looks like you want to reuse this kafka table in one job, It’s supported to execute multiple query in one sql job in Flink 1.11. 
You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a single SQL job[1].


Best
Leonard 
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement>


> 在 2020年8月5日,04:34,Marco Villalobos <mv...@kineteque.com> 写道:
> 
> Lets say that I have:
> 
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
> 
> Let's also say that main data from this Flink job arrives in Kafka Topic One.
> 
> If I need SQL Query One and SQL Query Two to happen just one time, when the job starts up, and afterwards maybe store it in Keyed State or Broadcast State, but it's not really part of the stream, then what is the best practice for supporting that in Flink
> 
> The Flink job needs to stream data from Kafka Topic One, aggregate it, and perform computations that require all of the data in SQL Query One and SQL Query Two to perform its business logic.
> 
> I am using Flink 1.10.
> 
> I supposed to query the database before the Job I submitted, and then pass it on as parameters to a function?
> Or am I supposed to use JDBCInputFormat for both queries and create two streams, and somehow connect or broadcast both of them two the main stream that uses Kafka Topic One?
> 
> I would appreciate guidance. Please.  Thank you.
> 
> Sincerely,
> 
> Marco A. Villalobos
> 
> 
> 


Re: Two Queries and a Kafka Topic

Posted by Danny Chan <yu...@gmail.com>.
Hi, Marco ~

It seems what you need is a temporal join from the SQL side, you can define 2 Flink tables for your PostgreSQL ones and join your Kafka stream with them [1][3].

Flink 1.10 also supports this. There is some difference with the DDL compared to 1.11 [2]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html#temporal-table

Best,
Danny Chan
在 2020年8月5日 +0800 AM4:34,Marco Villalobos <mv...@kineteque.com>,写道:
> Lets say that I have:
>
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
>
> Let's also say that main data from this Flink job arrives in Kafka Topic One.
>
> If I need SQL Query One and SQL Query Two to happen just one time, when the job starts up, and afterwards maybe store it in Keyed State or Broadcast State, but it's not really part of the stream, then what is the best practice for supporting that in Flink
>
> The Flink job needs to stream data from Kafka Topic One, aggregate it, and perform computations that require all of the data in SQL Query One and SQL Query Two to perform its business logic.
>
> I am using Flink 1.10.
>
> I supposed to query the database before the Job I submitted, and then pass it on as parameters to a function?
> Or am I supposed to use JDBCInputFormat for both queries and create two streams, and somehow connect or broadcast both of them two the main stream that uses Kafka Topic One?
>
> I would appreciate guidance. Please.  Thank you.
>
> Sincerely,
>
> Marco A. Villalobos
>
>
>