You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maatary Okouya <ma...@gmail.com> on 2020/03/28 06:47:30 UTC

Re: [External] Re: From Kafka Stream to Flink

Hi all,

Just wondering what is the status at this point?

On Thu, Sep 19, 2019 at 4:38 PM Hequn Cheng <ch...@gmail.com> wrote:

> Hi,
>
> Fabian is totally right. Big thanks to the detailed answers and nice
> examples above.
>
> As for the PR, very sorry about the delay. It is mainly because of the
> merge of blink and my work switching to Flink Python recently.
> However, I think the later version of blink would cover this feature
> natively with further merges.
>
> Before that, I think we can use the solution Fabian provided above.
>
> There are some examples here[1][2] which may be helpful to you
> @Casado @Maatary.
> In [1], the test case quite matches your scenario(perform join after
> groupby+last_value). It also provides the udaf what you want and shows how
> to register it.
> In [2], the test shows how to use the built-in last_value in SQL. Note
> that the built-in last_value UDAF is only supported in blink-planner from
> flink-1.9.0. If you are using the flink-planner(or version before that),
> you can register the last_value UDAF with the TableEnvironment like it is
> showed in [1].
>
> Feel free to ask if there are other problems.
>
> Best, Hequn
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala#L207
> [2]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala#L228
>
> On Thu, Sep 19, 2019 at 9:40 PM Casado Tejedor, Rubén <
> ruben.casado.tejedor@accenture.com> wrote:
>
>> Thanks Fabian. @Hequn Cheng <ch...@gmail.com> Could you share the
>> status? Thanks for your amazing work!
>>
>>
>>
>> *De: *Fabian Hueske <fh...@gmail.com>
>> *Fecha: *viernes, 16 de agosto de 2019, 9:30
>> *Para: *"Casado Tejedor, Rubén" <ru...@accenture.com>
>> *CC: *Maatary Okouya <ma...@gmail.com>, miki haiat <
>> miko5054@gmail.com>, user <us...@flink.apache.org>, Hequn Cheng <
>> chenghequn@gmail.com>
>> *Asunto: *Re: [External] Re: From Kafka Stream to Flink
>>
>>
>>
>> Hi Ruben,
>>
>>
>>
>> Work on this feature has already started [1], but stalled a bit (probably
>> due to the effort of merging the new Blink query processor).
>>
>> Hequn (in CC) is the guy working on upsert table ingestion, maybe he can
>> share what the status of this feature is.
>>
>>
>>
>> Best, Fabian
>>
>>
>>
>> [1] https://github.com/apache/flink/pull/6787
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_6787&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=0Mc6IZBBxqaJ6S_possk4V4ZTpdNphlZ3NoNPeL6NGA&e=>
>>
>>
>>
>> Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <
>> ruben.casado.tejedor@accenture.com>:
>>
>> Hi
>>
>>
>>
>> Do you have an expected version of Flink to include the capability to
>> ingest an upsert stream as a dynamic table? We have such need in our
>> current project. What we have done is to emulate such behavior working at
>> low level with states (e.g. update existing value if key exists, create a
>> new value if key does not exist). But we cannot use SQL that would help as
>> to do it faster.
>>
>>
>>
>> Our use case is many small flink jobs that have to something like:
>>
>>
>>
>> SELECT *some fields*
>>
>> FROM *t1* INNER JOIN *t1 on t1.id
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t1.id&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=5ReK8KBJ2AMxI8faigLTfxwAxvlvXbtPG48TzkLZbXc&e=>
>> = t2.id
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t2.id&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=BnXyZjU0mHMrZ-gu7wRz5GUirxitCuQcCFjd8nbVNyw&e=>
>> (maybe join +3 tables)*
>>
>> WHERE *some conditions on fields*;
>>
>>
>>
>> We need the result of that queries taking into account only the last
>> values of each row. The result is inserted/updated in a in-memory K-V
>> database for fast access.
>>
>>
>>
>> Thanks in advance!
>>
>>
>>
>> Best
>>
>>
>>
>> *De: *Fabian Hueske <fh...@gmail.com>
>> *Fecha: *miércoles, 7 de agosto de 2019, 11:08
>> *Para: *Maatary Okouya <ma...@gmail.com>
>> *CC: *miki haiat <mi...@gmail.com>, user <us...@flink.apache.org>
>> *Asunto: *[External] Re: From Kafka Stream to Flink
>>
>>
>>
>> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with
>> links and attachments.
>> ------------------------------
>>
>>
>>
>> Hi,
>>
>>
>>
>> LAST_VAL is not a built-in function, so you'd need to implement it as a
>> user-defined aggregate function (UDAGG) and register it.
>>
>>
>>
>> The problem with joining an append only table with an updating table is
>> the following.
>>
>>
>>
>> Consider two tables: users (uid, name, zip) and orders (oid, uid,
>> product), with user being an updating table and orders being append only.
>>
>>
>>
>> On January 1st, the tables look like this:
>>
>>
>>
>> Users:
>>
>> uid_1, Fred, 12345
>>
>> uid_2, Mary, 67890
>>
>>
>>
>> Orders
>>
>> oid_1, uid_1, Popcorn
>>
>> oid_2, uid_2, Carrots
>>
>>
>>
>> Joining both tables with the following query SELECT oid, product, name,
>> zip FROM users u, orders o WHERE u.uid = o.uid results in:
>>
>>
>>
>> oid_1, Popcorn, Fred, 12345
>>
>> oid_2, Carrots, Mary, 67890
>>
>>
>>
>> Whenever, a new order is appended, we look up the corresponding user
>> data, perform the join and emit the results.
>>
>> Let's say on July 1st we have received 100 orders from our two users all
>> is fine. However, on July 2nd Fred updates his zip code because he moved to
>> another city.
>>
>> Our data now looks like this:
>>
>>
>>
>> Users:
>>
>> uid_1, Fred, 24680
>>
>> uid_2, Mary, 67890
>>
>>
>>
>> Orders
>>
>> oid_1, uid_1, Popcorn
>>
>> oid_2, uid_2, Carrots
>>
>> ....
>>
>> oid_100, uid_2, Potatoes
>>
>>
>>
>> The result of the same query as before is:
>>
>>
>>
>> oid_1, Popcorn, Fred, 24680
>>
>> oid_2, Carrots, Mary, 67890
>>
>> ....
>>
>> oid_100, Potatoes, Mary, 67890
>>
>>
>>
>> Notice how the first row changed?
>>
>> If we strictly follow SQL semantics (which we do in Flink SQL) the query
>> needs to update the ZIP code of the first result row.
>>
>> In order to do so, we need access to the original data of the orders
>> table, which is the append only table in our scenario.
>>
>> Consequently, we need to fully materialize append only tables when they
>> are joined with an updating table without temporal constraints.
>>
>>
>>
>> In many situations, the indented semantics for such a query would be to
>> join the order with the ZIP code of the user *that was valid at the time
>> when the order was placed*.
>>
>> However, this is *not* semantics of the query of our example. For such a
>> query, we need to model the data differently. The users table needs to
>> store all modifications, i.e., the full history of all updates.
>>
>> Each update needs a timestamp and each order needs a timestamp as well.
>> With these timestamps, we can write a query that joins an order with the
>> user data that we valid at the time when the order was placed.
>>
>> This is the temporal constraint that I mentioned before. With this
>> constraint, Flink can use the information about progressing time to reason
>> about how much state it needs to keep because a change of the user table
>> will only affect future orders.
>>
>>
>>
>> Flink makes this a lot easier with the concept of temporal tables [1] and
>> temporal table joins [2].
>>
>>
>>
>> Best,
>>
>> Fabian
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/temporal_tables.html
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.8_dev_table_streaming_temporal-5Ftables.html&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=qsYWKlnwGvoMp8BTKycFnjrxmmg3GLXf_AbKoXOAb_A&s=dkqRgaz4ropnGdtxuW8gYVx8hJNuPKdgS3K7hge-qvY&e=>
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/joins.html#join-with-a-temporal-table
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.8_dev_table_streaming_joins.html-23join-2Dwith-2Da-2Dtemporal-2Dtable&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=qsYWKlnwGvoMp8BTKycFnjrxmmg3GLXf_AbKoXOAb_A&s=YVSwoF7IzyDGMzL-0n1o5BolAcRxd4qN0u0ITq1cwXE&e=>
>>
>>
>>
>>
>>
>> Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya <
>> maatariokouya@gmail.com>:
>>
>> Fabian,
>>
>>
>>
>> ultimately, i just want to perform a join on the last values for each
>> keys.
>>
>>
>>
>> On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <ma...@gmail.com>
>> wrote:
>>
>> Fabian,
>>
>>
>>
>> could you please clarify the following statement:
>>
>>
>>
>> However joining an append-only table with this view without adding
>> temporal join condition, means that the stream is fully materialized as
>> state.
>>
>> This is because previously emitted results must be updated when the view
>> changes.
>>
>> It really depends on the semantics of the join and query that you need,
>> how much state the query will need to maintain.
>>
>>
>>
>>
>>
>> I am not sure to understand the problem. If i have to append-only table
>> and perform some join on it, what's the issue ?
>>
>>
>>
>>
>>
>> On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <ma...@gmail.com>
>> wrote:
>>
>> Thank you for the clarification. Really appreciated.
>>
>>
>>
>> Is Last_val part of the API ?
>>
>>
>>
>> On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi,
>>
>>
>>
>> Flink does not distinguish between streams and tables. For the Table API
>> / SQL, there are only tables that are changing over time, i.e., dynamic
>> tables.
>>
>> A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with
>> append-only changes, i.e., records are only inserted and never deleted or
>> modified.
>>
>> A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has
>> upsert and delete changes, i.e., the table has a unique key and records are
>> inserted, deleted, or updated per key.
>>
>>
>>
>> In the current version, Flink does not have native support to ingest an
>> upsert stream as a dynamic table (right now only append-only tables can be
>> ingested, native support for upsert tables will be added soon.).
>>
>> However, you can create a view with the following SQL query on an
>> append-only table that creates an upsert table:
>>
>>
>>
>> SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
>>
>> FROM appendOnlyTable
>>
>> GROUP BY key
>>
>>
>>
>> Given, this view, you can run all kinds of SQL queries on it.
>>
>> However joining an append-only table with this view without adding
>> temporal join condition, means that the stream is fully materialized as
>> state.
>>
>> This is because previously emitted results must be updated when the view
>> changes.
>>
>> It really depends on the semantics of the join and query that you need,
>> how much state the query will need to maintain.
>>
>>
>>
>> An alternative to using Table API / SQL and it's dynamic table
>> abstraction is to use Flink's DataStream API and ProcessFunctions.
>>
>> These APIs are more low level and expose access to state and timers,
>> which are the core ingredients for stream processing.
>>
>> You can implement pretty much all logic of KStreams and more in these
>> APIs.
>>
>>
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>> Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <
>> maatariokouya@gmail.com>:
>>
>> I would like to have a KTable, or maybe in Flink term a dynamic Table,
>> that only contains the latest value for each keyed record. This would allow
>> me to perform aggregation and join, based on the latest state of every
>> record, as opposed to every record over time, or a period of time.
>>
>>
>>
>> On Sun, Jul 21, 2019 at 8:21 AM miki haiat <mi...@gmail.com> wrote:
>>
>> Can you elaborate more  about your use case .
>>
>>
>>
>> On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <ma...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>>
>>
>> I am a user of Kafka Stream so far. However, because i have been face
>> with several limitation in particular in performing Join on KTable.
>>
>>
>>
>> I was wondering what is the appraoch in Flink to achieve  (1) the concept
>> of KTable, i.e. a Table that represent a changeLog, i.e. only the latest
>> version of all keyed records,  and (2) joining those.
>>
>>
>>
>> There are currently a lot of limitation around that on Kafka Stream, and
>> i need that for performing some ETL process, where i need to mirror entire
>> databases in Kafka, and then do some join on the table to emit the logical
>> entity in Kafka Topics. I was hoping that somehow i could acheive that by
>> using FLink as intermediary.
>>
>>
>>
>> I can see that you support any kind of join, but i just don't see the
>> notion of Ktable.
>>
>>
>>
>>
>>
>>
>> ------------------------------
>>
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Where allowed
>> by local law, electronic communications with Accenture and its affiliates,
>> including e-mail and instant messaging (including content), may be scanned
>> by our systems for the purposes of information security and assessment of
>> internal compliance with Accenture policy. Your privacy is important to us.
>> Accenture uses your personal data only in compliance with data protection
>> laws. For further information on how Accenture processes your personal
>> data, please see our privacy statement at
>> https://www.accenture.com/us-en/privacy-policy.
>>
>> ______________________________________________________________________________________
>>
>> www.accenture.com
>>
>>

Re: [External] Re: From Kafka Stream to Flink

Posted by Kurt Young <yk...@gmail.com>.
I think this requirement can be satisfied by temporal table function [1],
am I missing anything?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#temporal-table-function

Best,
Kurt


On Sat, Mar 28, 2020 at 2:47 PM Maatary Okouya <ma...@gmail.com>
wrote:

> Hi all,
>
> Just wondering what is the status at this point?
>
> On Thu, Sep 19, 2019 at 4:38 PM Hequn Cheng <ch...@gmail.com> wrote:
>
>> Hi,
>>
>> Fabian is totally right. Big thanks to the detailed answers and nice
>> examples above.
>>
>> As for the PR, very sorry about the delay. It is mainly because of the
>> merge of blink and my work switching to Flink Python recently.
>> However, I think the later version of blink would cover this feature
>> natively with further merges.
>>
>> Before that, I think we can use the solution Fabian provided above.
>>
>> There are some examples here[1][2] which may be helpful to you
>> @Casado @Maatary.
>> In [1], the test case quite matches your scenario(perform join after
>> groupby+last_value). It also provides the udaf what you want and shows how
>> to register it.
>> In [2], the test shows how to use the built-in last_value in SQL. Note
>> that the built-in last_value UDAF is only supported in blink-planner from
>> flink-1.9.0. If you are using the flink-planner(or version before that),
>> you can register the last_value UDAF with the TableEnvironment like it is
>> showed in [1].
>>
>> Feel free to ask if there are other problems.
>>
>> Best, Hequn
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala#L207
>> [2]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala#L228
>>
>> On Thu, Sep 19, 2019 at 9:40 PM Casado Tejedor, Rubén <
>> ruben.casado.tejedor@accenture.com> wrote:
>>
>>> Thanks Fabian. @Hequn Cheng <ch...@gmail.com> Could you share the
>>> status? Thanks for your amazing work!
>>>
>>>
>>>
>>> *De: *Fabian Hueske <fh...@gmail.com>
>>> *Fecha: *viernes, 16 de agosto de 2019, 9:30
>>> *Para: *"Casado Tejedor, Rubén" <ru...@accenture.com>
>>> *CC: *Maatary Okouya <ma...@gmail.com>, miki haiat <
>>> miko5054@gmail.com>, user <us...@flink.apache.org>, Hequn Cheng <
>>> chenghequn@gmail.com>
>>> *Asunto: *Re: [External] Re: From Kafka Stream to Flink
>>>
>>>
>>>
>>> Hi Ruben,
>>>
>>>
>>>
>>> Work on this feature has already started [1], but stalled a bit
>>> (probably due to the effort of merging the new Blink query processor).
>>>
>>> Hequn (in CC) is the guy working on upsert table ingestion, maybe he can
>>> share what the status of this feature is.
>>>
>>>
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> [1] https://github.com/apache/flink/pull/6787
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_6787&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=0Mc6IZBBxqaJ6S_possk4V4ZTpdNphlZ3NoNPeL6NGA&e=>
>>>
>>>
>>>
>>> Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <
>>> ruben.casado.tejedor@accenture.com>:
>>>
>>> Hi
>>>
>>>
>>>
>>> Do you have an expected version of Flink to include the capability to
>>> ingest an upsert stream as a dynamic table? We have such need in our
>>> current project. What we have done is to emulate such behavior working at
>>> low level with states (e.g. update existing value if key exists, create a
>>> new value if key does not exist). But we cannot use SQL that would help as
>>> to do it faster.
>>>
>>>
>>>
>>> Our use case is many small flink jobs that have to something like:
>>>
>>>
>>>
>>> SELECT *some fields*
>>>
>>> FROM *t1* INNER JOIN *t1 on t1.id
>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t1.id&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=5ReK8KBJ2AMxI8faigLTfxwAxvlvXbtPG48TzkLZbXc&e=>
>>> = t2.id
>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t2.id&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=BnXyZjU0mHMrZ-gu7wRz5GUirxitCuQcCFjd8nbVNyw&e=>
>>> (maybe join +3 tables)*
>>>
>>> WHERE *some conditions on fields*;
>>>
>>>
>>>
>>> We need the result of that queries taking into account only the last
>>> values of each row. The result is inserted/updated in a in-memory K-V
>>> database for fast access.
>>>
>>>
>>>
>>> Thanks in advance!
>>>
>>>
>>>
>>> Best
>>>
>>>
>>>
>>> *De: *Fabian Hueske <fh...@gmail.com>
>>> *Fecha: *miércoles, 7 de agosto de 2019, 11:08
>>> *Para: *Maatary Okouya <ma...@gmail.com>
>>> *CC: *miki haiat <mi...@gmail.com>, user <us...@flink.apache.org>
>>> *Asunto: *[External] Re: From Kafka Stream to Flink
>>>
>>>
>>>
>>> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with
>>> links and attachments.
>>> ------------------------------
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> LAST_VAL is not a built-in function, so you'd need to implement it as a
>>> user-defined aggregate function (UDAGG) and register it.
>>>
>>>
>>>
>>> The problem with joining an append only table with an updating table is
>>> the following.
>>>
>>>
>>>
>>> Consider two tables: users (uid, name, zip) and orders (oid, uid,
>>> product), with user being an updating table and orders being append only.
>>>
>>>
>>>
>>> On January 1st, the tables look like this:
>>>
>>>
>>>
>>> Users:
>>>
>>> uid_1, Fred, 12345
>>>
>>> uid_2, Mary, 67890
>>>
>>>
>>>
>>> Orders
>>>
>>> oid_1, uid_1, Popcorn
>>>
>>> oid_2, uid_2, Carrots
>>>
>>>
>>>
>>> Joining both tables with the following query SELECT oid, product, name,
>>> zip FROM users u, orders o WHERE u.uid = o.uid results in:
>>>
>>>
>>>
>>> oid_1, Popcorn, Fred, 12345
>>>
>>> oid_2, Carrots, Mary, 67890
>>>
>>>
>>>
>>> Whenever, a new order is appended, we look up the corresponding user
>>> data, perform the join and emit the results.
>>>
>>> Let's say on July 1st we have received 100 orders from our two users all
>>> is fine. However, on July 2nd Fred updates his zip code because he moved to
>>> another city.
>>>
>>> Our data now looks like this:
>>>
>>>
>>>
>>> Users:
>>>
>>> uid_1, Fred, 24680
>>>
>>> uid_2, Mary, 67890
>>>
>>>
>>>
>>> Orders
>>>
>>> oid_1, uid_1, Popcorn
>>>
>>> oid_2, uid_2, Carrots
>>>
>>> ....
>>>
>>> oid_100, uid_2, Potatoes
>>>
>>>
>>>
>>> The result of the same query as before is:
>>>
>>>
>>>
>>> oid_1, Popcorn, Fred, 24680
>>>
>>> oid_2, Carrots, Mary, 67890
>>>
>>> ....
>>>
>>> oid_100, Potatoes, Mary, 67890
>>>
>>>
>>>
>>> Notice how the first row changed?
>>>
>>> If we strictly follow SQL semantics (which we do in Flink SQL) the query
>>> needs to update the ZIP code of the first result row.
>>>
>>> In order to do so, we need access to the original data of the orders
>>> table, which is the append only table in our scenario.
>>>
>>> Consequently, we need to fully materialize append only tables when they
>>> are joined with an updating table without temporal constraints.
>>>
>>>
>>>
>>> In many situations, the indented semantics for such a query would be to
>>> join the order with the ZIP code of the user *that was valid at the time
>>> when the order was placed*.
>>>
>>> However, this is *not* semantics of the query of our example. For such a
>>> query, we need to model the data differently. The users table needs to
>>> store all modifications, i.e., the full history of all updates.
>>>
>>> Each update needs a timestamp and each order needs a timestamp as well.
>>> With these timestamps, we can write a query that joins an order with the
>>> user data that we valid at the time when the order was placed.
>>>
>>> This is the temporal constraint that I mentioned before. With this
>>> constraint, Flink can use the information about progressing time to reason
>>> about how much state it needs to keep because a change of the user table
>>> will only affect future orders.
>>>
>>>
>>>
>>> Flink makes this a lot easier with the concept of temporal tables [1]
>>> and temporal table joins [2].
>>>
>>>
>>>
>>> Best,
>>>
>>> Fabian
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/temporal_tables.html
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.8_dev_table_streaming_temporal-5Ftables.html&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=qsYWKlnwGvoMp8BTKycFnjrxmmg3GLXf_AbKoXOAb_A&s=dkqRgaz4ropnGdtxuW8gYVx8hJNuPKdgS3K7hge-qvY&e=>
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/joins.html#join-with-a-temporal-table
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.8_dev_table_streaming_joins.html-23join-2Dwith-2Da-2Dtemporal-2Dtable&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=qsYWKlnwGvoMp8BTKycFnjrxmmg3GLXf_AbKoXOAb_A&s=YVSwoF7IzyDGMzL-0n1o5BolAcRxd4qN0u0ITq1cwXE&e=>
>>>
>>>
>>>
>>>
>>>
>>> Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya <
>>> maatariokouya@gmail.com>:
>>>
>>> Fabian,
>>>
>>>
>>>
>>> ultimately, i just want to perform a join on the last values for each
>>> keys.
>>>
>>>
>>>
>>> On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <ma...@gmail.com>
>>> wrote:
>>>
>>> Fabian,
>>>
>>>
>>>
>>> could you please clarify the following statement:
>>>
>>>
>>>
>>> However joining an append-only table with this view without adding
>>> temporal join condition, means that the stream is fully materialized as
>>> state.
>>>
>>> This is because previously emitted results must be updated when the view
>>> changes.
>>>
>>> It really depends on the semantics of the join and query that you need,
>>> how much state the query will need to maintain.
>>>
>>>
>>>
>>>
>>>
>>> I am not sure to understand the problem. If i have to append-only table
>>> and perform some join on it, what's the issue ?
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <ma...@gmail.com>
>>> wrote:
>>>
>>> Thank you for the clarification. Really appreciated.
>>>
>>>
>>>
>>> Is Last_val part of the API ?
>>>
>>>
>>>
>>> On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> Flink does not distinguish between streams and tables. For the Table API
>>> / SQL, there are only tables that are changing over time, i.e., dynamic
>>> tables.
>>>
>>> A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with
>>> append-only changes, i.e., records are only inserted and never deleted or
>>> modified.
>>>
>>> A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has
>>> upsert and delete changes, i.e., the table has a unique key and records are
>>> inserted, deleted, or updated per key.
>>>
>>>
>>>
>>> In the current version, Flink does not have native support to ingest an
>>> upsert stream as a dynamic table (right now only append-only tables can be
>>> ingested, native support for upsert tables will be added soon.).
>>>
>>> However, you can create a view with the following SQL query on an
>>> append-only table that creates an upsert table:
>>>
>>>
>>>
>>> SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
>>>
>>> FROM appendOnlyTable
>>>
>>> GROUP BY key
>>>
>>>
>>>
>>> Given, this view, you can run all kinds of SQL queries on it.
>>>
>>> However joining an append-only table with this view without adding
>>> temporal join condition, means that the stream is fully materialized as
>>> state.
>>>
>>> This is because previously emitted results must be updated when the view
>>> changes.
>>>
>>> It really depends on the semantics of the join and query that you need,
>>> how much state the query will need to maintain.
>>>
>>>
>>>
>>> An alternative to using Table API / SQL and it's dynamic table
>>> abstraction is to use Flink's DataStream API and ProcessFunctions.
>>>
>>> These APIs are more low level and expose access to state and timers,
>>> which are the core ingredients for stream processing.
>>>
>>> You can implement pretty much all logic of KStreams and more in these
>>> APIs.
>>>
>>>
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>>
>>>
>>> Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <
>>> maatariokouya@gmail.com>:
>>>
>>> I would like to have a KTable, or maybe in Flink term a dynamic Table,
>>> that only contains the latest value for each keyed record. This would allow
>>> me to perform aggregation and join, based on the latest state of every
>>> record, as opposed to every record over time, or a period of time.
>>>
>>>
>>>
>>> On Sun, Jul 21, 2019 at 8:21 AM miki haiat <mi...@gmail.com> wrote:
>>>
>>> Can you elaborate more  about your use case .
>>>
>>>
>>>
>>> On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <ma...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> I am a user of Kafka Stream so far. However, because i have been face
>>> with several limitation in particular in performing Join on KTable.
>>>
>>>
>>>
>>> I was wondering what is the appraoch in Flink to achieve  (1) the
>>> concept of KTable, i.e. a Table that represent a changeLog, i.e. only the
>>> latest version of all keyed records,  and (2) joining those.
>>>
>>>
>>>
>>> There are currently a lot of limitation around that on Kafka Stream, and
>>> i need that for performing some ETL process, where i need to mirror entire
>>> databases in Kafka, and then do some join on the table to emit the logical
>>> entity in Kafka Topics. I was hoping that somehow i could acheive that by
>>> using FLink as intermediary.
>>>
>>>
>>>
>>> I can see that you support any kind of join, but i just don't see the
>>> notion of Ktable.
>>>
>>>
>>>
>>>
>>>
>>>
>>> ------------------------------
>>>
>>>
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If you have
>>> received it in error, please notify the sender immediately and delete the
>>> original. Any other use of the e-mail by you is prohibited. Where allowed
>>> by local law, electronic communications with Accenture and its affiliates,
>>> including e-mail and instant messaging (including content), may be scanned
>>> by our systems for the purposes of information security and assessment of
>>> internal compliance with Accenture policy. Your privacy is important to us.
>>> Accenture uses your personal data only in compliance with data protection
>>> laws. For further information on how Accenture processes your personal
>>> data, please see our privacy statement at
>>> https://www.accenture.com/us-en/privacy-policy.
>>>
>>> ______________________________________________________________________________________
>>>
>>> www.accenture.com
>>>
>>>