You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Vikram More <vi...@gmail.com> on 2017/09/19 00:21:28 UTC

Removing duplicates from data

Hi Everyone,

I am new to NiFi and community :)

I am trying to build a Nifi flow which will pull from Oracle table and load
into Postgres table. My select query has two columns and I need to remove
duplicates based on these two columns. Can I remove duplicates in Nifi
based on two column data values. My flow is like below -
ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL


PutSQL question : Oracle table has ~ 4 million records and when the PutSQL
was running , it gave several similar errors :

"Failed to update database due to failed batch update. There were total of
1 FlowFiles that failed, 5 that successful, and 9 that were not execute and
will be routed to retry"

Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and
don't have any primary key constraint on postgres table.
(Should I create primary key with those two columns, so while loading it
can reject duplicate records, but will it rejects the complete batch rather
than just duplicates ?)

Would be great if someone can provide insight in this scenario ?

Thanks,
Vikram

Re: Removing duplicates from data

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Vikram,

PutDatabaseRecord has been available since NiFi 1.2.0. So you need to
upgrade your NiFi installation to use that.
Other than PutDatabaseRecord, there are three possible ways to remove
duplicates that I can think of:

1. Disable PutSQL, 'Support Fragmented Transaction'
2. Use DetectDuplicate, need to use ExtractJSONPath in advance to create
"Cache Entry Identifier" value from Col A and B
3. If your PostgreSQL is 9.5 or higher, use INSERT ... ON CONFLICT UPDATE
statement to do 'upsert' operation

#1 Since you split an Avro dataset via SplitAvro, those fragmented
FlowFiles share the same 'fragment.identifier' attribute, then PutSQL
executes insert statements for those by single batch operation.
If there's a primary key constraint violation, whole insert statements will
be rolled back. But if you disable 'Support Fragmented Transaction', each
insert will be committed individually, and if you setup the right primary
key constraint on PostgreSQL, you will get the expected behavior,
duplicated record will fail.
However, this approach will provide less database update throughput by not
using batch insert.

#2 NiFi also has DetectDuplicate processor. Please see processor usage doc
for detail.
In short, you would extract Col A and B value using ExtractJSONPath into
FlowFile attribute, e.g. col.a and col.b, then use those as "Cache Entry
Identifier" at DetectDuplicate with NiFI Expression Language, for example,
${col.a}::${col.b}.
Then DetectDuplicate processor can route FlowFiles to 'duplicate' if the
same col.a and col.b value pair is already seen, the 2nd "C2::item3" in
your example data set.
This approach has limitation on number of entries can be cached, meaning if
the same key arrives after the previous one is already invalidated from the
cache, it can't be detected.

#3 I personally haven't tested this, but you may be able to construct a
flow to execute 'upsert' operation so that you don't have to worry about
duplicates.
https://stackoverflow.com/questions/17267417/how-to-upsert-merge-insert-on-duplicate-update-in-postgresql

Thanks,
Koji

On Tue, Sep 19, 2017 at 1:18 PM, Vikram More <vi...@gmail.com>
wrote:

> Could not find 'PutDatabaseRecord' in the NiFi version : 1.1.0.2.1.2.0-10
> I am using . Please suggest ?
>
> On Tue, Sep 19, 2017 at 12:10 AM, Vikram More <vi...@gmail.com>
> wrote:
>
>> Hi Koji,
>> Thanks for response and helpful links !
>>
>> NiFi version : 1.1.0.2.1.2.0-10
>>
>> I am trying to move data from operational system (oracle db) to
>> analytical system (postgres db). Postgres table has been model/designed by
>> us (and can add primary key). Data from oracle looks like below  (i need to
>> remove duplicate record for combination on ColA , ColB)
>>
>> Col A Col B
>> C1 item 1
>> C1 item 2
>> *C2* *item 3*
>> *C2* *item 4*
>> *C2* *item 3*
>> C3 item 1
>> C4 null
>> C5 item 5
>> C6 item 7
>> I will try to explore PutDatabaseRecord processor and see i can achieve
>> desired purpose.
>>
>> Thanks,
>> Vikram
>>
>> On Mon, Sep 18, 2017 at 9:59 PM, Koji Kawamura <ij...@gmail.com>
>> wrote:
>>
>>> Hello Vikram,
>>>
>>> Welcome to NiFi and the community :)
>>>
>>> Would you elaborate your data flow? And which version you are using?
>>> For example, can you share some input data extracted from Oracle? I
>>> wonder why you need to remove duplicate records while PostgreSQL
>>> doesn't have primary key constraint, or why you have such records in
>>> the beginning.
>>>
>>> Current PutSQL does not report the cause of batch update failure well.
>>> But that behavior has been improved and you can see what is the cause
>>> if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source
>>> code to try it).
>>> https://issues.apache.org/jira/browse/NIFI-4162
>>>
>>> Please refer NiFi README.md for how to build and run NiFi from source
>>> code.
>>> https://github.com/apache/nifi
>>>
>>> Also, in order to put Avro data to an RDBMS, NiFi also has
>>> PutDatabaseRecord processor today. Which can work more efficiently
>>> because you don't have to use 'split avro -> avrotojson -> jsontosql'
>>> part, PutDatabaseRecord can directly execute DML statement from Avro
>>> dataset.
>>> https://nifi.apache.org/docs/nifi-docs/components/org.apache
>>> .nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.sta
>>> ndard.PutDatabaseRecord/index.html
>>>
>>> Thanks,
>>> Koji
>>>
>>> On Tue, Sep 19, 2017 at 9:21 AM, Vikram More <vi...@gmail.com>
>>> wrote:
>>> > Hi Everyone,
>>> >
>>> > I am new to NiFi and community :)
>>> >
>>> > I am trying to build a Nifi flow which will pull from Oracle table and
>>> load
>>> > into Postgres table. My select query has two columns and I need to
>>> remove
>>> > duplicates based on these two columns. Can I remove duplicates in Nifi
>>> based
>>> > on two column data values. My flow is like below -
>>> > ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL
>>> >
>>> >
>>> > PutSQL question : Oracle table has ~ 4 million records and when the
>>> PutSQL
>>> > was running , it gave several similar errors :
>>> >
>>> > "Failed to update database due to failed batch update. There were
>>> total of 1
>>> > FlowFiles that failed, 5 that successful, and 9 that were not execute
>>> and
>>> > will be routed to retry"
>>> >
>>> > Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and
>>> don't
>>> > have any primary key constraint on postgres table.
>>> > (Should I create primary key with those two columns, so while loading
>>> it can
>>> > reject duplicate records, but will it rejects the complete batch
>>> rather than
>>> > just duplicates ?)
>>> >
>>> > Would be great if someone can provide insight in this scenario ?
>>> >
>>> > Thanks,
>>> > Vikram
>>>
>>
>>
>

Re: Removing duplicates from data

Posted by Vikram More <vi...@gmail.com>.
Could not find 'PutDatabaseRecord' in the NiFi version : 1.1.0.2.1.2.0-10 I
am using . Please suggest ?

On Tue, Sep 19, 2017 at 12:10 AM, Vikram More <vi...@gmail.com>
wrote:

> Hi Koji,
> Thanks for response and helpful links !
>
> NiFi version : 1.1.0.2.1.2.0-10
>
> I am trying to move data from operational system (oracle db) to analytical
> system (postgres db). Postgres table has been model/designed by us (and can
> add primary key). Data from oracle looks like below  (i need to remove
> duplicate record for combination on ColA , ColB)
>
> Col A Col B
> C1 item 1
> C1 item 2
> *C2* *item 3*
> *C2* *item 4*
> *C2* *item 3*
> C3 item 1
> C4 null
> C5 item 5
> C6 item 7
> I will try to explore PutDatabaseRecord processor and see i can achieve
> desired purpose.
>
> Thanks,
> Vikram
>
> On Mon, Sep 18, 2017 at 9:59 PM, Koji Kawamura <ij...@gmail.com>
> wrote:
>
>> Hello Vikram,
>>
>> Welcome to NiFi and the community :)
>>
>> Would you elaborate your data flow? And which version you are using?
>> For example, can you share some input data extracted from Oracle? I
>> wonder why you need to remove duplicate records while PostgreSQL
>> doesn't have primary key constraint, or why you have such records in
>> the beginning.
>>
>> Current PutSQL does not report the cause of batch update failure well.
>> But that behavior has been improved and you can see what is the cause
>> if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source
>> code to try it).
>> https://issues.apache.org/jira/browse/NIFI-4162
>>
>> Please refer NiFi README.md for how to build and run NiFi from source
>> code.
>> https://github.com/apache/nifi
>>
>> Also, in order to put Avro data to an RDBMS, NiFi also has
>> PutDatabaseRecord processor today. Which can work more efficiently
>> because you don't have to use 'split avro -> avrotojson -> jsontosql'
>> part, PutDatabaseRecord can directly execute DML statement from Avro
>> dataset.
>> https://nifi.apache.org/docs/nifi-docs/components/org.apache
>> .nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.
>> standard.PutDatabaseRecord/index.html
>>
>> Thanks,
>> Koji
>>
>> On Tue, Sep 19, 2017 at 9:21 AM, Vikram More <vi...@gmail.com>
>> wrote:
>> > Hi Everyone,
>> >
>> > I am new to NiFi and community :)
>> >
>> > I am trying to build a Nifi flow which will pull from Oracle table and
>> load
>> > into Postgres table. My select query has two columns and I need to
>> remove
>> > duplicates based on these two columns. Can I remove duplicates in Nifi
>> based
>> > on two column data values. My flow is like below -
>> > ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL
>> >
>> >
>> > PutSQL question : Oracle table has ~ 4 million records and when the
>> PutSQL
>> > was running , it gave several similar errors :
>> >
>> > "Failed to update database due to failed batch update. There were total
>> of 1
>> > FlowFiles that failed, 5 that successful, and 9 that were not execute
>> and
>> > will be routed to retry"
>> >
>> > Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and
>> don't
>> > have any primary key constraint on postgres table.
>> > (Should I create primary key with those two columns, so while loading
>> it can
>> > reject duplicate records, but will it rejects the complete batch rather
>> than
>> > just duplicates ?)
>> >
>> > Would be great if someone can provide insight in this scenario ?
>> >
>> > Thanks,
>> > Vikram
>>
>
>

Re: Removing duplicates from data

Posted by Vikram More <vi...@gmail.com>.
Hi Koji,
Thanks for response and helpful links !

NiFi version : 1.1.0.2.1.2.0-10

I am trying to move data from operational system (oracle db) to analytical
system (postgres db). Postgres table has been model/designed by us (and can
add primary key). Data from oracle looks like below  (i need to remove
duplicate record for combination on ColA , ColB)

Col A Col B
C1 item 1
C1 item 2
*C2* *item 3*
*C2* *item 4*
*C2* *item 3*
C3 item 1
C4 null
C5 item 5
C6 item 7
I will try to explore PutDatabaseRecord processor and see i can achieve
desired purpose.

Thanks,
Vikram

On Mon, Sep 18, 2017 at 9:59 PM, Koji Kawamura <ij...@gmail.com>
wrote:

> Hello Vikram,
>
> Welcome to NiFi and the community :)
>
> Would you elaborate your data flow? And which version you are using?
> For example, can you share some input data extracted from Oracle? I
> wonder why you need to remove duplicate records while PostgreSQL
> doesn't have primary key constraint, or why you have such records in
> the beginning.
>
> Current PutSQL does not report the cause of batch update failure well.
> But that behavior has been improved and you can see what is the cause
> if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source
> code to try it).
> https://issues.apache.org/jira/browse/NIFI-4162
>
> Please refer NiFi README.md for how to build and run NiFi from source code.
> https://github.com/apache/nifi
>
> Also, in order to put Avro data to an RDBMS, NiFi also has
> PutDatabaseRecord processor today. Which can work more efficiently
> because you don't have to use 'split avro -> avrotojson -> jsontosql'
> part, PutDatabaseRecord can directly execute DML statement from Avro
> dataset.
> https://nifi.apache.org/docs/nifi-docs/components/org.
> apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard.
> PutDatabaseRecord/index.html
>
> Thanks,
> Koji
>
> On Tue, Sep 19, 2017 at 9:21 AM, Vikram More <vi...@gmail.com>
> wrote:
> > Hi Everyone,
> >
> > I am new to NiFi and community :)
> >
> > I am trying to build a Nifi flow which will pull from Oracle table and
> load
> > into Postgres table. My select query has two columns and I need to remove
> > duplicates based on these two columns. Can I remove duplicates in Nifi
> based
> > on two column data values. My flow is like below -
> > ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL
> >
> >
> > PutSQL question : Oracle table has ~ 4 million records and when the
> PutSQL
> > was running , it gave several similar errors :
> >
> > "Failed to update database due to failed batch update. There were total
> of 1
> > FlowFiles that failed, 5 that successful, and 9 that were not execute and
> > will be routed to retry"
> >
> > Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and
> don't
> > have any primary key constraint on postgres table.
> > (Should I create primary key with those two columns, so while loading it
> can
> > reject duplicate records, but will it rejects the complete batch rather
> than
> > just duplicates ?)
> >
> > Would be great if someone can provide insight in this scenario ?
> >
> > Thanks,
> > Vikram
>

Re: Removing duplicates from data

Posted by Koji Kawamura <ij...@gmail.com>.
Hello Vikram,

Welcome to NiFi and the community :)

Would you elaborate your data flow? And which version you are using?
For example, can you share some input data extracted from Oracle? I
wonder why you need to remove duplicate records while PostgreSQL
doesn't have primary key constraint, or why you have such records in
the beginning.

Current PutSQL does not report the cause of batch update failure well.
But that behavior has been improved and you can see what is the cause
if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source
code to try it).
https://issues.apache.org/jira/browse/NIFI-4162

Please refer NiFi README.md for how to build and run NiFi from source code.
https://github.com/apache/nifi

Also, in order to put Avro data to an RDBMS, NiFi also has
PutDatabaseRecord processor today. Which can work more efficiently
because you don't have to use 'split avro -> avrotojson -> jsontosql'
part, PutDatabaseRecord can directly execute DML statement from Avro
dataset.
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard.PutDatabaseRecord/index.html

Thanks,
Koji

On Tue, Sep 19, 2017 at 9:21 AM, Vikram More <vi...@gmail.com> wrote:
> Hi Everyone,
>
> I am new to NiFi and community :)
>
> I am trying to build a Nifi flow which will pull from Oracle table and load
> into Postgres table. My select query has two columns and I need to remove
> duplicates based on these two columns. Can I remove duplicates in Nifi based
> on two column data values. My flow is like below -
> ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL
>
>
> PutSQL question : Oracle table has ~ 4 million records and when the PutSQL
> was running , it gave several similar errors :
>
> "Failed to update database due to failed batch update. There were total of 1
> FlowFiles that failed, 5 that successful, and 9 that were not execute and
> will be routed to retry"
>
> Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and don't
> have any primary key constraint on postgres table.
> (Should I create primary key with those two columns, so while loading it can
> reject duplicate records, but will it rejects the complete batch rather than
> just duplicates ?)
>
> Would be great if someone can provide insight in this scenario ?
>
> Thanks,
> Vikram