You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Boris Tyukin <bo...@boristyukin.com> on 2019/02/22 14:06:45 UTC

join two datasets

Hi guys,

I pull two datasets from two different databases on schedule and need to
join both on some ID and then publish combined dataset to Kafka.

What is the best way to do this? Puzzled how I would synchronize two data
pulls so data is joined for exact flowfiles I need, i.e. if there are
errors anythere, I do not want to join older flowfile with a newer one.

Thanks!
Boris

Re: join two datasets

Posted by Mike Thomsen <mi...@gmail.com>.
Boris,

We also use them for data cleanup. A common pattern I established on my
team is to script out a service with ScriptedLookupService and use it to
either regenerate a missing field from other fields or rewrite a field with
bad data.

On Fri, Feb 22, 2019 at 2:38 PM Boris Tyukin <bo...@boristyukin.com> wrote:

> awesome, thanks, guys! I will try both options but lookup makes a lot of
> sense and probably will be easier to support and understand.
>
> We are planning to get NiFi 1.9 soon too, really excited with all the new
> features especially load balancing connections and Hive 1.1 processor.
> Which is funny because we just created ours to work with Hive on CDH.
>
> Kudu based lookup also sounds great - we love Kudu and started using it
> recently for real-time replication of Oracle databases into our cluster.
>
> Boris
>
>
>
> On Fri, Feb 22, 2019 at 1:14 PM Mike Thomsen <mi...@gmail.com>
> wrote:
>
>> @Boris
>>
>> Mark's approach will work for a lot of scenarios. I've used it
>> extensively with different clients.
>>
>> On Fri, Feb 22, 2019 at 1:10 PM Mark Payne <ma...@hotmail.com> wrote:
>>
>>> This is certainly a better route to go than my previous suggestion :)
>>> Have one flow that grabs one of the datasets and stores it somewhere.
>>> In a CSV or XML file, even. Then, have a second flow that pulls the
>>> other dataset and uses LookupRecord to perform
>>> the enrichment. The CSVLookupService and XMLLookupService would
>>> automatically reload when the data is updated.
>>> We should probably have a JDBCLookupService as well, which would allow
>>> for dynamic lookups against a database. I
>>> thought that existed already but does not appear to. Point is, you can
>>> look at DataSet A as the 'reference dataset' and
>>> DataSet B as the 'streaming dataset' and then use LookupRecord in order
>>> to do the enrichment/join.
>>>
>>> Unfortunately, I don't seem to be able to find any blogs that describe
>>> this pattern, but it would certainly make for a good
>>> blog. Generally, you'd have two flows setup, though:
>>>
>>> Flow A (get the enrichment dataset):
>>> ExcuteSQLRecord (write as CSV) -> PutFile
>>>
>>> Flow B (enrich the other dataset):
>>> ExecuteSQLRecord -> LookupRecord (uses a CSVLookupService that loads the
>>> file written by the other flow) -> PublishKafkaRecord_2_0
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>> On Feb 22, 2019, at 12:30 PM, Joe Witt <jo...@gmail.com> wrote:
>>>
>>> I should add you can use NiFi to update the reference dataset in a
>>> database/backing store in one flow.  And have another flow that handles the
>>> live stream/lookup,etc.  MarkPayne/Others: I think there are blogs that
>>> describe this pattern.  Anyone have links?
>>>
>>> On Fri, Feb 22, 2019 at 12:27 PM Joe Witt <jo...@gmail.com> wrote:
>>>
>>>> Boris,
>>>>
>>>> Great.  So have a process to load the periodic dataset into a lookup
>>>> service.  COuld be backed by a simple file, a database, Hive, whatever.
>>>> Then have the live flow run against that.
>>>>
>>>> This reminds me - we should make a Kudu based lookup service i think.
>>>> I'll chat with some of our new Kudu friends on this.
>>>>
>>>> Thanks
>>>>
>>>> On Fri, Feb 22, 2019 at 12:25 PM Boris Tyukin <bo...@boristyukin.com>
>>>> wrote:
>>>>
>>>>> Thanks Joe and Bryan. In this case I don't need to do it in real-time,
>>>>> probably once a day only.
>>>>>
>>>>> I am thinking to trigger both pulls by generateflow processor, then
>>>>> merge datasets somehow since flowfile id will be the same for both sets.
>>>>> And then need to join somehow.
>>>>>
>>>>> Would like to use nifi still :)
>>>>>
>>>>
>>>

Re: join two datasets

Posted by Boris Tyukin <bo...@boristyukin.com>.
awesome, thanks, guys! I will try both options but lookup makes a lot of
sense and probably will be easier to support and understand.

We are planning to get NiFi 1.9 soon too, really excited with all the new
features especially load balancing connections and Hive 1.1 processor.
Which is funny because we just created ours to work with Hive on CDH.

Kudu based lookup also sounds great - we love Kudu and started using it
recently for real-time replication of Oracle databases into our cluster.

Boris



On Fri, Feb 22, 2019 at 1:14 PM Mike Thomsen <mi...@gmail.com> wrote:

> @Boris
>
> Mark's approach will work for a lot of scenarios. I've used it extensively
> with different clients.
>
> On Fri, Feb 22, 2019 at 1:10 PM Mark Payne <ma...@hotmail.com> wrote:
>
>> This is certainly a better route to go than my previous suggestion :)
>> Have one flow that grabs one of the datasets and stores it somewhere.
>> In a CSV or XML file, even. Then, have a second flow that pulls the other
>> dataset and uses LookupRecord to perform
>> the enrichment. The CSVLookupService and XMLLookupService would
>> automatically reload when the data is updated.
>> We should probably have a JDBCLookupService as well, which would allow
>> for dynamic lookups against a database. I
>> thought that existed already but does not appear to. Point is, you can
>> look at DataSet A as the 'reference dataset' and
>> DataSet B as the 'streaming dataset' and then use LookupRecord in order
>> to do the enrichment/join.
>>
>> Unfortunately, I don't seem to be able to find any blogs that describe
>> this pattern, but it would certainly make for a good
>> blog. Generally, you'd have two flows setup, though:
>>
>> Flow A (get the enrichment dataset):
>> ExcuteSQLRecord (write as CSV) -> PutFile
>>
>> Flow B (enrich the other dataset):
>> ExecuteSQLRecord -> LookupRecord (uses a CSVLookupService that loads the
>> file written by the other flow) -> PublishKafkaRecord_2_0
>>
>> Thanks
>> -Mark
>>
>>
>> On Feb 22, 2019, at 12:30 PM, Joe Witt <jo...@gmail.com> wrote:
>>
>> I should add you can use NiFi to update the reference dataset in a
>> database/backing store in one flow.  And have another flow that handles the
>> live stream/lookup,etc.  MarkPayne/Others: I think there are blogs that
>> describe this pattern.  Anyone have links?
>>
>> On Fri, Feb 22, 2019 at 12:27 PM Joe Witt <jo...@gmail.com> wrote:
>>
>>> Boris,
>>>
>>> Great.  So have a process to load the periodic dataset into a lookup
>>> service.  COuld be backed by a simple file, a database, Hive, whatever.
>>> Then have the live flow run against that.
>>>
>>> This reminds me - we should make a Kudu based lookup service i think.
>>> I'll chat with some of our new Kudu friends on this.
>>>
>>> Thanks
>>>
>>> On Fri, Feb 22, 2019 at 12:25 PM Boris Tyukin <bo...@boristyukin.com>
>>> wrote:
>>>
>>>> Thanks Joe and Bryan. In this case I don't need to do it in real-time,
>>>> probably once a day only.
>>>>
>>>> I am thinking to trigger both pulls by generateflow processor, then
>>>> merge datasets somehow since flowfile id will be the same for both sets.
>>>> And then need to join somehow.
>>>>
>>>> Would like to use nifi still :)
>>>>
>>>
>>

Re: join two datasets

Posted by Mike Thomsen <mi...@gmail.com>.
@Boris

Mark's approach will work for a lot of scenarios. I've used it extensively
with different clients.

On Fri, Feb 22, 2019 at 1:10 PM Mark Payne <ma...@hotmail.com> wrote:

> This is certainly a better route to go than my previous suggestion :) Have
> one flow that grabs one of the datasets and stores it somewhere.
> In a CSV or XML file, even. Then, have a second flow that pulls the other
> dataset and uses LookupRecord to perform
> the enrichment. The CSVLookupService and XMLLookupService would
> automatically reload when the data is updated.
> We should probably have a JDBCLookupService as well, which would allow for
> dynamic lookups against a database. I
> thought that existed already but does not appear to. Point is, you can
> look at DataSet A as the 'reference dataset' and
> DataSet B as the 'streaming dataset' and then use LookupRecord in order to
> do the enrichment/join.
>
> Unfortunately, I don't seem to be able to find any blogs that describe
> this pattern, but it would certainly make for a good
> blog. Generally, you'd have two flows setup, though:
>
> Flow A (get the enrichment dataset):
> ExcuteSQLRecord (write as CSV) -> PutFile
>
> Flow B (enrich the other dataset):
> ExecuteSQLRecord -> LookupRecord (uses a CSVLookupService that loads the
> file written by the other flow) -> PublishKafkaRecord_2_0
>
> Thanks
> -Mark
>
>
> On Feb 22, 2019, at 12:30 PM, Joe Witt <jo...@gmail.com> wrote:
>
> I should add you can use NiFi to update the reference dataset in a
> database/backing store in one flow.  And have another flow that handles the
> live stream/lookup,etc.  MarkPayne/Others: I think there are blogs that
> describe this pattern.  Anyone have links?
>
> On Fri, Feb 22, 2019 at 12:27 PM Joe Witt <jo...@gmail.com> wrote:
>
>> Boris,
>>
>> Great.  So have a process to load the periodic dataset into a lookup
>> service.  COuld be backed by a simple file, a database, Hive, whatever.
>> Then have the live flow run against that.
>>
>> This reminds me - we should make a Kudu based lookup service i think.
>> I'll chat with some of our new Kudu friends on this.
>>
>> Thanks
>>
>> On Fri, Feb 22, 2019 at 12:25 PM Boris Tyukin <bo...@boristyukin.com>
>> wrote:
>>
>>> Thanks Joe and Bryan. In this case I don't need to do it in real-time,
>>> probably once a day only.
>>>
>>> I am thinking to trigger both pulls by generateflow processor, then
>>> merge datasets somehow since flowfile id will be the same for both sets.
>>> And then need to join somehow.
>>>
>>> Would like to use nifi still :)
>>>
>>
>

Re: join two datasets

Posted by Mark Payne <ma...@hotmail.com>.
This is certainly a better route to go than my previous suggestion :) Have one flow that grabs one of the datasets and stores it somewhere.
In a CSV or XML file, even. Then, have a second flow that pulls the other dataset and uses LookupRecord to perform
the enrichment. The CSVLookupService and XMLLookupService would automatically reload when the data is updated.
We should probably have a JDBCLookupService as well, which would allow for dynamic lookups against a database. I
thought that existed already but does not appear to. Point is, you can look at DataSet A as the 'reference dataset' and
DataSet B as the 'streaming dataset' and then use LookupRecord in order to do the enrichment/join.

Unfortunately, I don't seem to be able to find any blogs that describe this pattern, but it would certainly make for a good
blog. Generally, you'd have two flows setup, though:

Flow A (get the enrichment dataset):
ExcuteSQLRecord (write as CSV) -> PutFile

Flow B (enrich the other dataset):
ExecuteSQLRecord -> LookupRecord (uses a CSVLookupService that loads the file written by the other flow) -> PublishKafkaRecord_2_0

Thanks
-Mark


On Feb 22, 2019, at 12:30 PM, Joe Witt <jo...@gmail.com>> wrote:

I should add you can use NiFi to update the reference dataset in a database/backing store in one flow.  And have another flow that handles the live stream/lookup,etc.  MarkPayne/Others: I think there are blogs that describe this pattern.  Anyone have links?

On Fri, Feb 22, 2019 at 12:27 PM Joe Witt <jo...@gmail.com>> wrote:
Boris,

Great.  So have a process to load the periodic dataset into a lookup service.  COuld be backed by a simple file, a database, Hive, whatever.  Then have the live flow run against that.

This reminds me - we should make a Kudu based lookup service i think.  I'll chat with some of our new Kudu friends on this.

Thanks

On Fri, Feb 22, 2019 at 12:25 PM Boris Tyukin <bo...@boristyukin.com>> wrote:
Thanks Joe and Bryan. In this case I don't need to do it in real-time, probably once a day only.

I am thinking to trigger both pulls by generateflow processor, then merge datasets somehow since flowfile id will be the same for both sets. And then need to join somehow.

Would like to use nifi still :)


Re: join two datasets

Posted by Joe Witt <jo...@gmail.com>.
I should add you can use NiFi to update the reference dataset in a
database/backing store in one flow.  And have another flow that handles the
live stream/lookup,etc.  MarkPayne/Others: I think there are blogs that
describe this pattern.  Anyone have links?

On Fri, Feb 22, 2019 at 12:27 PM Joe Witt <jo...@gmail.com> wrote:

> Boris,
>
> Great.  So have a process to load the periodic dataset into a lookup
> service.  COuld be backed by a simple file, a database, Hive, whatever.
> Then have the live flow run against that.
>
> This reminds me - we should make a Kudu based lookup service i think.
> I'll chat with some of our new Kudu friends on this.
>
> Thanks
>
> On Fri, Feb 22, 2019 at 12:25 PM Boris Tyukin <bo...@boristyukin.com>
> wrote:
>
>> Thanks Joe and Bryan. In this case I don't need to do it in real-time,
>> probably once a day only.
>>
>> I am thinking to trigger both pulls by generateflow processor, then merge
>> datasets somehow since flowfile id will be the same for both sets. And then
>> need to join somehow.
>>
>> Would like to use nifi still :)
>>
>

Re: join two datasets

Posted by Joe Witt <jo...@gmail.com>.
Boris,

Great.  So have a process to load the periodic dataset into a lookup
service.  COuld be backed by a simple file, a database, Hive, whatever.
Then have the live flow run against that.

This reminds me - we should make a Kudu based lookup service i think.  I'll
chat with some of our new Kudu friends on this.

Thanks

On Fri, Feb 22, 2019 at 12:25 PM Boris Tyukin <bo...@boristyukin.com> wrote:

> Thanks Joe and Bryan. In this case I don't need to do it in real-time,
> probably once a day only.
>
> I am thinking to trigger both pulls by generateflow processor, then merge
> datasets somehow since flowfile id will be the same for both sets. And then
> need to join somehow.
>
> Would like to use nifi still :)
>

Re: join two datasets

Posted by Mark Payne <ma...@hotmail.com>.
Boris,

I would echo the cautions from Bryan & Joe. However, you could perceivably achieve this by extracting out some id
into an attribute that would associate the two FlowFiles together (for example 'dataset.id'). Use MergeRecord or MergeContent
to merge the data together using that as a correlation attribute or using the Defragment mode. This would get data from both
datasets into the same FlowFile. Then use QueryRecord and use the COALESCE function and GROUP BY in order to join
together the columns from both datasets. 

Your schema would need to accommodate all of the fields in both datasets, but if you're running 1.9.0, the schema inference
should handle that...


> On Feb 22, 2019, at 12:24 PM, Boris Tyukin <bo...@boristyukin.com> wrote:
> 
> Thanks Joe and Bryan. In this case I don't need to do it in real-time, probably once a day only.
> 
> I am thinking to trigger both pulls by generateflow processor, then merge datasets somehow since flowfile id will be the same for both sets. And then need to join somehow.
> 
> Would like to use nifi still :)


Re: join two datasets

Posted by Boris Tyukin <bo...@boristyukin.com>.
Thanks Joe and Bryan. In this case I don't need to do it in real-time,
probably once a day only.

I am thinking to trigger both pulls by generateflow processor, then merge
datasets somehow since flowfile id will be the same for both sets. And then
need to join somehow.

Would like to use nifi still :)

Re: join two datasets

Posted by Joe Witt <jo...@gmail.com>.
Right I agree with Bryan so let me expand a bit.

There are some key primitives that stream processing systems address as it
relates to joining two live streams that those systems are designed to
solve well.  NiFi offers nothing special/unique in that space.

Now, as Bryan pointed out a really common case in NiFi is to have a
reference/lookup dataset accessible through one of our LookupService
implementations (and you can write your own).  The use cases for this are
'live stream of event data coming in with a customer id, ip address, etc...
some other lookup value and you want to them lookup that value against a
reference dataset and merge in the results such as 'geolocation, customer
address/billing data, etc..'.   This is very doable.

The key difference here is about the liveness of the streams of
information.  If they're both always changing/updating/morphing then you
want a full stream join compliment of features.  If one of the datasets is
updating infrequently like geolocation data, customer information, etc..
then our existing capabilities work quite well.

You might then say 'well why don't I just use a stream processing system
for all this?'  That then ties into the fact that those systems are
designed for different tradeoffs in memory, APIs to handle small/large
objects, lack provenance, etc..  NiFi is a stream processing system with a
bias to managing the flow of information large/small/fast/batch/etc..
Where as stream processing/analytics type systems are built with a bias
toward analytic processing, often run in memory or have key points with
checkpointing into kafka or other systems (which also implies a certain per
event size, etc..).

Hopefully this distinction helps.

Thanks
Joe

On Fri, Feb 22, 2019 at 9:51 AM Bryan Bende <bb...@gmail.com> wrote:

> Hi Boris,
>
> Joining across two different data streams is not really something NiFi
> is aiming to solve.
>
> Generally I think we'd say that you'd use one of the stream processing
> systems like Flink, Spark, Storm, etc.
>
> Another possible option might be to pull the data and land it in a
> common location like Hive, then you can run a single query against
> Hive that joins the tables.
>
> Others may have more experience with solving this than I do, so
> curious to hear other approaches people have taken.
>
> -Bryan
>
> On Fri, Feb 22, 2019 at 9:08 AM Boris Tyukin <bo...@boristyukin.com>
> wrote:
> >
> > Hi guys,
> >
> > I pull two datasets from two different databases on schedule and need to
> join both on some ID and then publish combined dataset to Kafka.
> >
> > What is the best way to do this? Puzzled how I would synchronize two
> data pulls so data is joined for exact flowfiles I need, i.e. if there are
> errors anythere, I do not want to join older flowfile with a newer one.
> >
> > Thanks!
> > Boris
>

Re: join two datasets

Posted by Bryan Bende <bb...@gmail.com>.
Hi Boris,

Joining across two different data streams is not really something NiFi
is aiming to solve.

Generally I think we'd say that you'd use one of the stream processing
systems like Flink, Spark, Storm, etc.

Another possible option might be to pull the data and land it in a
common location like Hive, then you can run a single query against
Hive that joins the tables.

Others may have more experience with solving this than I do, so
curious to hear other approaches people have taken.

-Bryan

On Fri, Feb 22, 2019 at 9:08 AM Boris Tyukin <bo...@boristyukin.com> wrote:
>
> Hi guys,
>
> I pull two datasets from two different databases on schedule and need to join both on some ID and then publish combined dataset to Kafka.
>
> What is the best way to do this? Puzzled how I would synchronize two data pulls so data is joined for exact flowfiles I need, i.e. if there are errors anythere, I do not want to join older flowfile with a newer one.
>
> Thanks!
> Boris