You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by miki haiat <mi...@gmail.com> on 2018/05/15 14:37:31 UTC

Re: data enrichment with SQL use case

HI guys ,
This is how i tried to  solve my enrichment case
https://gist.github.com/miko-code/d615aa05b65579f4366ba9fe8a8275fd
<https://gist.github.com/miko-code/d615aa05b65579f4366ba9fe8a8275fd>
Currently we need to use *keyby()* before the process  function.
My concern is if i have  in flight  N messages with the same key the
process function will execute once or N times ?

Thanks,

MIki

On Thu, Apr 26, 2018 at 1:24 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi all,
>
> @Ken, the approach of telling the operator which input to read from would
> cause problems with the current checkpointing mechanism because checkpoint
> barriers are not allowed to overtake regular records. Chaining wouldn't be
> an issue, because operators with two inputs are not chained to their
> predecessors.
>
> The side inputs are exactly the effort to address these use cases. Im not
> 100% into the details, but AFAIK, there are some improvements to the
> checkpointing mechanism that need to be solved before side input can be
> implemented.
> Side inputs will also support to initially read side inputs (blocking all
> other streams) and starting the other other streams once the initialization
> is completed. Afterwards the side inputs will still be able to provide
> updates.
>
> Buffering records in a function does not necessarily lead to OOME. If the
> stream is keyed, you can put the state into a RocksDBStateBackend and write
> it to disk.
>
> Best, Fabian
>
> 2018-04-25 23:36 GMT+02:00 Ken Krugler <kk...@transpac.com>:
>
>> Hi Michael,
>>
>> Windowing works when you’re joining timestamped metadata and non-metadata.
>>
>> The common case I’m referring to is where there’s some function state
>> (e.g. rules to process data, machine learning models, or in my case
>> clusters), where you want to process the non-metadata with the "current
>> state”.
>>
>> In that case, blindly applying whatever metadata has been collected to
>> incoming non-metadata often doesn’t work well. That’s why Fabian was
>> suggesting various approaches (below) to work around the problem. The
>> general solution (his option #2, with buffering) will work, but can lead to
>> OOME and feels like it breaks the basic Flink back-pressure mechanism, due
>> to in-operator buffering.
>>
>> If it was possible to essentially allow Flink to block (or not pull, for
>> sources) from the non-metadata stream when appropriate, then no buffering
>> would be needed. Then it would be straightforward to do things like…
>>
>> - drain all metadata from a Kafka topic before applying that to the other
>> stream.
>> - defer processing data from the other stream if there was newer metadata.
>>
>> As an aside, what I’m seeing with Flink 1.5 and using a connected keyed &
>> broadcast stream is that the CoFlatMapFunction seems to be giving priority
>> to data going to the flatMap1() method, though this could be an odd side
>> effect of how iterations impact the two streams.
>>
>> — Ken
>>
>>
>> On Apr 25, 2018, at 1:09 PM, TechnoMage <ml...@technomage.com> wrote:
>>
>> I agree in the general case you need to operate on the stream data based
>> on the metadata you have.  The side input feature coming some day may help
>> you, in that it would give you a means to receive inputs out of band.  But,
>> given changing metadata and changing stream data I am not sure this is any
>> different from dual stream data inputs.  Either you use windowing to do
>> small batches of data to allow coordination of stream and metadata, or you
>> use the metadata you have collected to date on receipt of the stream data.
>> Given flink will do record by record processing you have the option of
>> controlling the timing as needed for your use case.
>>
>> Michael
>>
>> On Apr 25, 2018, at 1:57 PM, Ken Krugler <kk...@transpac.com>
>> wrote:
>>
>> Hi Michael,
>>
>> I agree there are cases where it’s possible to implement a solution via
>> buffering.
>>
>> But this case of using broadcast state to update a function operating on
>> streaming data seems common enough that it would be useful for Flink to
>> provide some help.
>>
>> Additionally, even with buffering there are currently challenges...
>>
>> 1. For the case I’m dealing with (iterative KMeans clustering) you don’t
>> have a time when "metadata is aggregated", as it’s constantly evolving.
>>
>> 2. It’s sometimes not possible to know when you’ve received all of the
>> metadata (e.g. if you’re reading from a Kafka topic).
>>
>> 3. Buffering the non-metadata can create an unbounded memory issue.
>>
>> Regards,
>>
>> — Ken
>>
>>
>> On Apr 25, 2018, at 12:39 PM, Michael Latta <la...@me.com> wrote:
>>
>> Using a flat map function, you can always buffer the non-meta data stream
>> in the operator state until the metadata is aggregated, and then process
>> any collected data.  It would require a RichFlatMap to hold data.
>>
>> Michael
>>
>> On Apr 25, 2018, at 1:20 PM, Ken Krugler <kk...@transpac.com>
>> wrote:
>>
>> Hi Fabian,
>>
>> On Apr 24, 2018, at 3:01 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi Alex,
>>
>> An operator that has to join two input streams obviously requires two
>> inputs. In case of an enrichment join, the operator should first read the
>> meta-data stream and build up a data structure as state against which the
>> other input is joined. If the meta data is (infrequently) updated, these
>> updates should be integrated into the state.
>>
>> The problem is that it is currently not possible to implement such an
>> operator with Flink because operators cannot decide from which input to
>> read, i.e., they have to process whatever data is given to them.
>> Hence, it is not possible to build up a data structure from the meta data
>> stream before consuming the other stream.
>>
>>
>> This seems like a common situation, and one where it might be relatively
>> easy for Flink to help resolve.
>>
>> Specifically, for a connected stream feeding a Co(Flat)MapFunction, it
>> seems like we could let Flink know how to pick elements from the two
>> network buffers - e.g. random, round robin, or by timestamp.
>>
>> I don’t know how this works with chained operators, but it does seem a
>> bit odd to have operators create buffers of elements when (network) buffers
>> often already exist.
>>
>> If there’s no network buffers in play (e.g. there’s a direct chain of
>> operators from a source) then it could be something that’s not supported,
>> though with the future source-pull architecture that would also be easy to
>> resolve.
>>
>> Anyway, I could take a whack at this if it seems reasonable.
>>
>> — Ken
>>
>>
>>
>>
>> There are a few workarounds that work in special cases.
>> 1) The meta data is rather small and never updated. You put the meta data
>> as a file into a (distributed) file system an read it from each function
>> instance when it is initialized, i.e., in open(), and put into a hash map.
>> Each function instance will hold the complete meta data in memory (on the
>> heap). Since the meta data is broadcasted, the other stream does not need
>> to be partitioned to join against the meta data in the hash map. You can
>> implement this function as a FlatMapFunction or ProcessFunction.
>> 2) The meta data is too large and/or is updated. In this case, you need a
>> function with two inputs. Both inputs are keyed (keyBy()) on a join
>> attribute. Since you cannot hold back the non-meta data stream, you need to
>> buffer it in (keyed) state until you've read the meta data stream up to a
>> point when you can start processing the other stream. If the meta data is
>> updated at some point, you can just add the new data to the state. The
>> benefits of this approach is that the state is shared across all operators
>> and can be updated. However, you might need to initially buffer quite a bit
>> of data in state if the non-meta data stream has a high volume.
>>
>> Hope that one of these approaches works for your use case.
>>
>> Best, Fabian
>>
>> 2018-04-23 13:29 GMT+02:00 Alexander Smirnov <
>> alexander.smirnoff@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> please share the workarounds, that must be helpful for my case as well
>>>
>>> Thank you,
>>> Alex
>>>
>>> On Mon, Apr 23, 2018 at 2:14 PM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi Miki,
>>>>
>>>> Sorry for the late response.
>>>> There are basically two ways to implement an enrichment join as in your
>>>> use case.
>>>>
>>>> 1) Keep the meta data in the database and implement a job that reads
>>>> the stream from Kafka and queries the database in an ASyncIO operator for
>>>> every stream record. This should be the easier implementation but it will
>>>> send one query to the DB for each streamed record.
>>>> 2) Replicate the meta data into Flink state and join the streamed
>>>> records with the state. This solution is more complex because you need
>>>> propagate updates of the meta data (if there are any) into the Flink state.
>>>> At the moment, Flink lacks a few features to have a good implementation of
>>>> this approach, but there a some workarounds that help in certain cases.
>>>>
>>>> Note that Flink's SQL support does not add advantages for the either of
>>>> both approaches. You should use the DataStream API (and possible
>>>> ProcessFunctions).
>>>>
>>>> I'd go for the first approach if one query per record is feasible.
>>>> Let me know if you need to tackle the second approach and I can give
>>>> some details on the workarounds I mentioned.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-04-16 20:38 GMT+02:00 Ken Krugler <kk...@transpac.com>:
>>>>
>>>>> Hi Miki,
>>>>>
>>>>> I haven’t tried mixing AsyncFunctions with SQL queries.
>>>>>
>>>>> Normally I’d create a regular DataStream workflow that first reads
>>>>> from Kafka, then has an AsyncFunction to read from the SQL database.
>>>>>
>>>>> If there are often duplicate keys in the Kafka-based stream, you could
>>>>> keyBy(key) before the AsyncFunction, and then cache the result of the SQL
>>>>> query.
>>>>>
>>>>> — Ken
>>>>>
>>>>> On Apr 16, 2018, at 11:19 AM, miki haiat <mi...@gmail.com> wrote:
>>>>>
>>>>> HI thanks  for the reply  i will try to break your reply to the flow
>>>>> execution order .
>>>>>
>>>>> First data stream Will use AsyncIO and select the table ,
>>>>> Second stream will be kafka and the i can join the stream and map it ?
>>>>>
>>>>> If that   the case  then i will select the table only once on load ?
>>>>> How can i make sure that my stream table is "fresh" .
>>>>>
>>>>> Im thinking to myself , is thire a way to use flink backend (ROKSDB)
>>>>> and create read/write through
>>>>> macanisem ?
>>>>>
>>>>> Thanks
>>>>>
>>>>> miki
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler <
>>>>> kkrugler_lists@transpac.com> wrote:
>>>>>
>>>>>> If the SQL data is all (or mostly all) needed to join against the
>>>>>> data from Kafka, then I might try a regular join.
>>>>>>
>>>>>> Otherwise it sounds like you want to use an AsyncFunction to do ad
>>>>>> hoc queries (in parallel) against your SQL DB.
>>>>>>
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html
>>>>>>
>>>>>> — Ken
>>>>>>
>>>>>>
>>>>>> On Apr 15, 2018, at 12:15 PM, miki haiat <mi...@gmail.com> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a case of meta data enrichment and im wondering if my approach
>>>>>> is the correct way .
>>>>>>
>>>>>>    1. input stream from kafka.
>>>>>>    2. MD in msSQL .
>>>>>>    3. map to new pojo
>>>>>>
>>>>>> I need to extract  a key from the kafka stream   and use it to select
>>>>>> some values from the sql table  .
>>>>>>
>>>>>> SO i thought  to use  the table SQL api in order to select the table
>>>>>> MD
>>>>>> then convert the kafka stream to table and join the data by  the
>>>>>> stream key .
>>>>>>
>>>>>> At the end i need to map the joined data to a new POJO and send it to
>>>>>> elesticserch .
>>>>>>
>>>>>> Any suggestions or different ways to solve this use case ?
>>>>>>
>>>>>> thanks,
>>>>>> Miki
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --------------------------
>>>>>> Ken Krugler
>>>>>> http://www.scaleunlimited.com
>>>>>> custom big data solutions & training
>>>>>> Hadoop, Cascading, Cassandra & Solr
>>>>>>
>>>>>>
>>>>>
>>>>> --------------------------------------------
>>>>> http://about.me/kkrugler
>>>>> +1 530-210-6378 <(530)%20210-6378>
>>>>>
>>>>>
>>>>
>>
>> --------------------------------------------
>> http://about.me/kkrugler
>> +1 530-210-6378
>>
>>
>>
>> --------------------------------------------
>> http://about.me/kkrugler
>> +1 530-210-6378
>>
>>
>>
>> --------------------------------------------
>> http://about.me/kkrugler
>> +1 530-210-6378
>>
>>
>