You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Faisal Durrani <te...@gmail.com> on 2019/01/18 02:41:38 UTC

Re: Ingesting golden gate messages to Hbase using Nifi

Hi Timothy, Thank you for your input.

Hi Boris,
Hope you are doing well, I wanted to update you with solution which finally
worked for me. I found out the the current_ts column of the Golden gate
message was unique for all the transactions and in addition to that it also
has millisecond precision. I also found out that Hbase can store a long int
as unix timestamp. So based on that fact I used the below expression to
generate the unix timestamp

current_ts_ms:${current_ts:substring(20,26)}
current_ts_ux :
${current_ts:substring(0,19):toDate("yyyy-MM-dd'T'HH:mm:ss","UTC"):toNumber()}
hbase_ct: ${current_ts_ux:multiply(1000):plus(${current_ts_ms})}

So in summary i convert the timestamp (with out the ms part) into unix
number and then simply add the remaining millisecond. The ms precision
gives the proper ordering of the events.

Regards,
Faisal



On Tue, Nov 13, 2018 at 3:18 AM Timothy Spann <ti...@agilemobiledeveloper.com>
wrote:

> Enforcing order is really tricky with Kafka.   The only way to enforce
> order is to reduce the # of nodes processing.  You can have one NiFi master
> node read from Kafka and have it distribute the workload to other NiFi
> nodes and force ordering.  Or you may want to batch them up into say 10-15
> minute chunks.   Or you could use a staging table.
>
> You could also have something mark the order to make sure they run in
> order.   I am not sure if Golden Gate can annotate them.   I think there is
> a Kafka # that could help.
>
>
> On Mon, Nov 12, 2018 at 12:16 PM Boris Tyukin <bo...@boristyukin.com>
> wrote:
>
>> Faisal, BTW I stumbled upon this doc, that explains how HBase GoldenGate
>> handler works in a similar scenario you've described:
>>
>> https://docs.oracle.com/goldengate/bd123210/gg-bd/GADBD/using-hbase-handler.htm#GADBD-GUID-1A9BA580-628B-48BD-9DC0-C3DF9722E0FB
>>
>> They provide an option to generate timestamp for hbase on a client side -
>> which is what I suggested earlier. In your case, you would need to build
>> this logic in NiFi. Still think op_ts,pos combo should give you a proper
>> ordering of events (so events sorted by op_ts and then by pos). When you
>> can come up with a rule to increment actual timestamp for hbase by a
>> millisecond, like Oracle does with their Hbase handler.
>>
>> Really interested what you end up doing, please share once you come up
>> with a solution.
>>
>> Boris
>>
>> On Wed, Nov 7, 2018 at 7:53 AM Boris <bo...@gmail.com> wrote:
>>
>>> Sorry I meant RBA.GG has a bunch of tokens you can add to your json
>>> file - you can even create your own. POS should be good and if op_ts does
>>> not work for you, why not to generate your own timestamp using POS? (Now()
>>> expression). You also add another token that identifies transaction
>>> sequence number and order opts and then by transaction sequence number.
>>> Please share what you will end up doing
>>>
>>> On Tue, Nov 6, 2018, 01:55 Faisal Durrani <te04.0172@gmail.com wrote:
>>>
>>>> Hi Boris,
>>>>
>>>> Thank you for your reply.  Let me try explaining my data flow in
>>>> detail. I am receiving the GG transaction as JSON format through Kafka so I
>>>> can only use the fields provided by the Kafka handler of GG ( Json
>>>> plug-gable format). I think you meant RBA value instead of rbc. I don't
>>>> think we can receive the RBA value in Json but there is a field called POS
>>>> which is a concatenation of source trail file number and RBA. So probably
>>>> we can use that in the Enforce order processor. But if we don't use the
>>>> timestamp information then we will run into the Hbase versioning issue.
>>>> The idea behind using the Op_ts was to version each row of our target table
>>>> and also help us with the DML operation. We are using the PK of each table
>>>> as the row_key of target Hbase table. Every new transaction(updated/delete)
>>>> of the table is logically inserted as a new row but since its the same pkey
>>>> so we can see the version each row. The operation with the highest
>>>> timestamp is the valid state of the row. I tested the enforce order
>>>> processor with the kafka offset and it skips all the records which arrive
>>>> later then the older offset which i don't understand why. If i decide to
>>>> use the enforce order on POS and use default timestamp in hbase then it
>>>> will skip ordering the the kafka messages arriving late and that will cause
>>>> the unsync. In addition to this I've read the Enforce order only orders the
>>>> row on a single node while we have a 5 node cluster. So I'm not sure how do
>>>> i combine all the flow files together on a single node? ( I know how to
>>>> distribute them i.e is by using S2S-RPG)
>>>>
>>>> I hope i have been able to explain my situation. Kindly let me know of
>>>> your views on this.
>>>>
>>>> Regards,
>>>> Faisal
>>>>
>>>>
>>>> On Mon, Nov 5, 2018 at 11:18 PM Boris Tyukin <bo...@boristyukin.com>
>>>> wrote:
>>>>
>>>>> Hi Faisal, I am not Timothy, but you raise an interesting problem we
>>>>> might face soon as well. I did not expect the situation you described and I
>>>>> thought transaction time would be different.
>>>>>
>>>>> Our intent was to use op_ts to enforce order but another option is to
>>>>> use GG rbc value or  oracle rowscn value  - did you consider them? GG
>>>>> RBC should identify unique transaction and within every transaction, you
>>>>> can also get operation# within a transaction. Also you can get trail file#
>>>>> and trail file position. GG is really powerful and gives you a bunch of
>>>>> data elements that you can enable on your message.
>>>>>
>>>>>
>>>>> https://docs.oracle.com/goldengate/1212/gg-winux/GWUAD/wu_fileformats.htm#GWUAD735
>>>>>
>>>>> Logdump tool is an awesome tool to look into your trail files and see
>>>>> what's in there.
>>>>>
>>>>> Boris
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 5, 2018 at 3:07 AM Faisal Durrani <te...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Timothy ,
>>>>>>
>>>>>> Hope you are doing well. We have been using your data flow(
>>>>>> https://community.hortonworks.com/content/kbentry/155527/ingesting-golden-gate-records-from-apache-kafka-an.html#
>>>>>> )
>>>>>> with slight modifications to store the data in Hbase. To version the
>>>>>> rows we have been using Op_ts of golden gate json . But now we have found
>>>>>> that multiple transactions can have the same Op_ts.  e.g. both update or
>>>>>> delete can have the same Op_ts and if they arrive out of order to the
>>>>>> PutHbaseJson processor then it can cause the target table to go out of
>>>>>> sync. I am using the a cluster of nifi nodes so i cannot use Enforceorder
>>>>>> processor to order the kafka messages as i understand it only order the
>>>>>> flow files on a single node only and not across the cluster. Additionally
>>>>>> we have a separate topic for each table and we have several consumer
>>>>>> groups. I tried using the Current_ts column of the golden gate message but
>>>>>> then if GG abends and restart the replication it will send the past data
>>>>>> with the newer current_ts which will also cause the un-sync. I was
>>>>>> wondering if you can give any idea so that we can order our transaction
>>>>>> correctly.
>>>>>>
>>>>>> Regards,
>>>>>> Faisal
>>>>>>
>>>>>

Re: Ingesting golden gate messages to Hbase using Nifi

Posted by Boris Tyukin <bo...@boristyukin.com>.
glad it is working for you, Faisal. I am sure you already know that
current_ts timestamp value will change every time you reprocess GG trail
files in case of GG server failure, checkpoint rollbacks or when you
reposition GG with a command like ALTER REPLICAT xxx, EXTSEQNO 5, EXTRBA 0
or ALTER EXTRACT.

op_ts is still going to be the same as it shows database transaction time
but you right that it only has seconds precision which is really odd why.

current_ts

The current timestamp is a timestamp of the current time when delimited text*
formatter processes the current operation record.* This timestamp follows
the ISO-8601 format and includes microsecond precision. Replaying the trail
file will not result in the same timestamp for the same operation.

On Thu, Jan 17, 2019 at 9:41 PM Faisal Durrani <te...@gmail.com> wrote:

> Hi Timothy, Thank you for your input.
>
> Hi Boris,
> Hope you are doing well, I wanted to update you with solution which
> finally worked for me. I found out the the current_ts column of the Golden
> gate message was unique for all the transactions and in addition to that it
> also has millisecond precision. I also found out that Hbase can store a
> long int as unix timestamp. So based on that fact I used the below
> expression to generate the unix timestamp
>
> current_ts_ms:${current_ts:substring(20,26)}
> current_ts_ux :
> ${current_ts:substring(0,19):toDate("yyyy-MM-dd'T'HH:mm:ss","UTC"):toNumber()}
> hbase_ct: ${current_ts_ux:multiply(1000):plus(${current_ts_ms})}
>
> So in summary i convert the timestamp (with out the ms part) into unix
> number and then simply add the remaining millisecond. The ms precision
> gives the proper ordering of the events.
>
> Regards,
> Faisal
>
>
>
> On Tue, Nov 13, 2018 at 3:18 AM Timothy Spann <
> tim@agilemobiledeveloper.com> wrote:
>
>> Enforcing order is really tricky with Kafka.   The only way to enforce
>> order is to reduce the # of nodes processing.  You can have one NiFi master
>> node read from Kafka and have it distribute the workload to other NiFi
>> nodes and force ordering.  Or you may want to batch them up into say 10-15
>> minute chunks.   Or you could use a staging table.
>>
>> You could also have something mark the order to make sure they run in
>> order.   I am not sure if Golden Gate can annotate them.   I think there is
>> a Kafka # that could help.
>>
>>
>> On Mon, Nov 12, 2018 at 12:16 PM Boris Tyukin <bo...@boristyukin.com>
>> wrote:
>>
>>> Faisal, BTW I stumbled upon this doc, that explains how HBase GoldenGate
>>> handler works in a similar scenario you've described:
>>>
>>> https://docs.oracle.com/goldengate/bd123210/gg-bd/GADBD/using-hbase-handler.htm#GADBD-GUID-1A9BA580-628B-48BD-9DC0-C3DF9722E0FB
>>>
>>> They provide an option to generate timestamp for hbase on a client
>>> side - which is what I suggested earlier. In your case, you would need to
>>> build this logic in NiFi. Still think op_ts,pos combo should give you a
>>> proper ordering of events (so events sorted by op_ts and then by pos). When
>>> you can come up with a rule to increment actual timestamp for hbase by a
>>> millisecond, like Oracle does with their Hbase handler.
>>>
>>> Really interested what you end up doing, please share once you come up
>>> with a solution.
>>>
>>> Boris
>>>
>>> On Wed, Nov 7, 2018 at 7:53 AM Boris <bo...@gmail.com> wrote:
>>>
>>>> Sorry I meant RBA.GG has a bunch of tokens you can add to your json
>>>> file - you can even create your own. POS should be good and if op_ts does
>>>> not work for you, why not to generate your own timestamp using POS? (Now()
>>>> expression). You also add another token that identifies transaction
>>>> sequence number and order opts and then by transaction sequence number.
>>>> Please share what you will end up doing
>>>>
>>>> On Tue, Nov 6, 2018, 01:55 Faisal Durrani <te04.0172@gmail.com wrote:
>>>>
>>>>> Hi Boris,
>>>>>
>>>>> Thank you for your reply.  Let me try explaining my data flow in
>>>>> detail. I am receiving the GG transaction as JSON format through Kafka so I
>>>>> can only use the fields provided by the Kafka handler of GG ( Json
>>>>> plug-gable format). I think you meant RBA value instead of rbc. I don't
>>>>> think we can receive the RBA value in Json but there is a field called POS
>>>>> which is a concatenation of source trail file number and RBA. So probably
>>>>> we can use that in the Enforce order processor. But if we don't use the
>>>>> timestamp information then we will run into the Hbase versioning issue.
>>>>> The idea behind using the Op_ts was to version each row of our target table
>>>>> and also help us with the DML operation. We are using the PK of each table
>>>>> as the row_key of target Hbase table. Every new transaction(updated/delete)
>>>>> of the table is logically inserted as a new row but since its the same pkey
>>>>> so we can see the version each row. The operation with the highest
>>>>> timestamp is the valid state of the row. I tested the enforce order
>>>>> processor with the kafka offset and it skips all the records which arrive
>>>>> later then the older offset which i don't understand why. If i decide to
>>>>> use the enforce order on POS and use default timestamp in hbase then it
>>>>> will skip ordering the the kafka messages arriving late and that will cause
>>>>> the unsync. In addition to this I've read the Enforce order only orders the
>>>>> row on a single node while we have a 5 node cluster. So I'm not sure how do
>>>>> i combine all the flow files together on a single node? ( I know how to
>>>>> distribute them i.e is by using S2S-RPG)
>>>>>
>>>>> I hope i have been able to explain my situation. Kindly let me know of
>>>>> your views on this.
>>>>>
>>>>> Regards,
>>>>> Faisal
>>>>>
>>>>>
>>>>> On Mon, Nov 5, 2018 at 11:18 PM Boris Tyukin <bo...@boristyukin.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Faisal, I am not Timothy, but you raise an interesting problem we
>>>>>> might face soon as well. I did not expect the situation you described and I
>>>>>> thought transaction time would be different.
>>>>>>
>>>>>> Our intent was to use op_ts to enforce order but another option is to
>>>>>> use GG rbc value or  oracle rowscn value  - did you consider them?
>>>>>> GG RBC should identify unique transaction and within every transaction, you
>>>>>> can also get operation# within a transaction. Also you can get trail file#
>>>>>> and trail file position. GG is really powerful and gives you a bunch of
>>>>>> data elements that you can enable on your message.
>>>>>>
>>>>>>
>>>>>> https://docs.oracle.com/goldengate/1212/gg-winux/GWUAD/wu_fileformats.htm#GWUAD735
>>>>>>
>>>>>> Logdump tool is an awesome tool to look into your trail files and see
>>>>>> what's in there.
>>>>>>
>>>>>> Boris
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 5, 2018 at 3:07 AM Faisal Durrani <te...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Timothy ,
>>>>>>>
>>>>>>> Hope you are doing well. We have been using your data flow(
>>>>>>> https://community.hortonworks.com/content/kbentry/155527/ingesting-golden-gate-records-from-apache-kafka-an.html#
>>>>>>> )
>>>>>>> with slight modifications to store the data in Hbase. To version the
>>>>>>> rows we have been using Op_ts of golden gate json . But now we have found
>>>>>>> that multiple transactions can have the same Op_ts.  e.g. both update or
>>>>>>> delete can have the same Op_ts and if they arrive out of order to the
>>>>>>> PutHbaseJson processor then it can cause the target table to go out of
>>>>>>> sync. I am using the a cluster of nifi nodes so i cannot use Enforceorder
>>>>>>> processor to order the kafka messages as i understand it only order the
>>>>>>> flow files on a single node only and not across the cluster. Additionally
>>>>>>> we have a separate topic for each table and we have several consumer
>>>>>>> groups. I tried using the Current_ts column of the golden gate message but
>>>>>>> then if GG abends and restart the replication it will send the past data
>>>>>>> with the newer current_ts which will also cause the un-sync. I was
>>>>>>> wondering if you can give any idea so that we can order our transaction
>>>>>>> correctly.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Faisal
>>>>>>>
>>>>>>