You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sid Kal <fl...@gmail.com> on 2022/01/27 15:46:51 UTC

How to delete the record

I am using Spark incremental approach for bringing the latest data
everyday. Everything works fine.

But now the main problem is when the record at the source is deleted, it
should be deleted in my final transformed record too.

How do I capture such changes and change my table too ?

Best regards,
Sid

Re: How to delete the record

Posted by Sean Owen <sr...@gmail.com>.
Delta, for example, manages merge/append/delete and also keeps previous
states of the table's data, so you can query what it looked like before.
See delta.io

On Thu, Jan 27, 2022, 11:54 AM Sid Kal <fl...@gmail.com> wrote:

> Hi Sean,
>
> So you mean if I use those file formats it will do the work of CDC
> automatically or I would have to handle it via code ?
>
> Hi Mich,
>
> Not sure if I understood you. Let me try to explain my scenario. Suppose
> there is a Id "1" which is inserted today, so I transformed and ingested
> it. Now suppose if this user id is deleted from the source itself. Then how
> can I delete it in my transformed db
> ?
>
>
>
> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sr...@gmail.com> wrote:
>
>> This is what storage engines like Delta, Hudi, Iceberg are for. No need
>> to manage it manually or use a DBMS. These formats allow deletes, upserts,
>> etc of data, using Spark, on cloud storage.
>>
>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Where ETL data is stored?
>>>
>>>
>>>
>>> *But now the main problem is when the record at the source is deleted,
>>> it should be deleted in my final transformed record too.*
>>>
>>>
>>> If your final sync (storage) is data warehouse, it should be soft
>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>>>
>>>> I am using Spark incremental approach for bringing the latest data
>>>> everyday. Everything works fine.
>>>>
>>>> But now the main problem is when the record at the source is deleted,
>>>> it should be deleted in my final transformed record too.
>>>>
>>>> How do I capture such changes and change my table too ?
>>>>
>>>> Best regards,
>>>> Sid
>>>>
>>>>

Re: How to delete the record

Posted by Mich Talebzadeh <mi...@gmail.com>.
There are two ways of doing it.


   1. Through snapshot offered meaning an immutable snapshot of the state
   of the table at a given version. For example, the state
   <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
   table <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
   the version
   <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
   2. creating your own versioning. Taking your example you define the
   target storage with, op_type (1,2,3), op_time <TIMESTAMP>. Your
   insured record will be

         ID     op_typeop_time
     1         1




   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Jan 2022 at 17:54, Sid Kal <fl...@gmail.com> wrote:

> Hi Sean,
>
> So you mean if I use those file formats it will do the work of CDC
> automatically or I would have to handle it via code ?
>
> Hi Mich,
>
> Not sure if I understood you. Let me try to explain my scenario. Suppose
> there is a Id "1" which is inserted today, so I transformed and ingested
> it. Now suppose if this user id is deleted from the source itself. Then how
> can I delete it in my transformed db
> ?
>
>
>
> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sr...@gmail.com> wrote:
>
>> This is what storage engines like Delta, Hudi, Iceberg are for. No need
>> to manage it manually or use a DBMS. These formats allow deletes, upserts,
>> etc of data, using Spark, on cloud storage.
>>
>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Where ETL data is stored?
>>>
>>>
>>>
>>> *But now the main problem is when the record at the source is deleted,
>>> it should be deleted in my final transformed record too.*
>>>
>>>
>>> If your final sync (storage) is data warehouse, it should be soft
>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>>>
>>>> I am using Spark incremental approach for bringing the latest data
>>>> everyday. Everything works fine.
>>>>
>>>> But now the main problem is when the record at the source is deleted,
>>>> it should be deleted in my final transformed record too.
>>>>
>>>> How do I capture such changes and change my table too ?
>>>>
>>>> Best regards,
>>>> Sid
>>>>
>>>>

Re: How to delete the record

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

I think it will be useful to understand the problem before solving the
problem.

Can I please ask what this table is? Is it a fact (event store) kind of a
table, or a dimension (master data) kind of table? And what are the
downstream consumptions of this table?

Besides that what is the unique identifier for a record in this table? For
example, some master data tables have unique identifiers as phone numbers,
which can get reallocated to other individuals over a period of time.

Is there any other information that you can provide on this
table, its contents, usage, etc?

There is a third option, which is akin to the second option that Mich was
mentioning, and that is basically a database transaction log, which gets
very large, very expensive to store and query over a period of time. Are
you creating a database transaction log?


Thanks and Regards,
Gourav Sengupta


On Thu, Jan 27, 2022 at 9:03 PM ayan guha <gu...@gmail.com> wrote:

> Btw, 2 options Mitch explained are not mutually exclusive. Option 2 can
> and should be implemented over a delta lake table anyway. Especially if you
> need to do hard deletes eventually (eg for regulatory needs)
>
>
>
> On Fri, 28 Jan 2022 at 6:50 am, Sid Kal <fl...@gmail.com> wrote:
>
>> Thanks Mich and Sean for your time
>>
>> On Fri, 28 Jan 2022, 00:53 Mich Talebzadeh, <mi...@gmail.com>
>> wrote:
>>
>>> Yes I believe so.
>>>
>>> Check this article of mine dated early 2019 but will have some relevance
>>> to what I am implying.
>>>
>>>
>>> https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 18:46, Sid Kal <fl...@gmail.com> wrote:
>>>
>>>> Okay sounds good.
>>>>
>>>> So,  below two options would help me to capture CDC changes:
>>>>
>>>> 1) Delta lake
>>>> 2) Maintaining snapshot of records with some indicators and timestamp.
>>>>
>>>> Correct me if I'm wrong
>>>>
>>>> Thanks,
>>>> Sid
>>>>
>>>> On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> There are two ways of doing it.
>>>>>
>>>>>
>>>>>    1. Through snapshot offered meaning an immutable snapshot of the
>>>>>    state of the table at a given version. For example, the state
>>>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of
>>>>>    a Delta table
>>>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>>>>>    the version
>>>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>>>>>    2. Creating your own versioning. Taking your example you define
>>>>>    the target storage *with two added columns, namely:* op_type INT
>>>>>    (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>.
>>>>>    Your example record will be
>>>>>
>>>>>
>>>>> id               op_type      op_time
>>>>>
>>>>> 1                1             <ingestion_time>
>>>>>
>>>>> 1                3             <ingestion_time>
>>>>>
>>>>>
>>>>>        df = rdd.toDF(). \
>>>>>
>>>>>             withColumnRenamed("_1", "ID"). \
>>>>>
>>>>>             withColumnRenamed("_2", "CLUSTERED"). \
>>>>>
>>>>>             withColumnRenamed("_3", "SCATTERED"). \
>>>>>
>>>>>             withColumnRenamed("_4", "RANDOMISED"). \
>>>>>
>>>>>             withColumnRenamed("_5", "RANDOM_STRING"). \
>>>>>
>>>>>             withColumnRenamed("_6", "SMALL_VC"). \
>>>>>
>>>>>             withColumnRenamed("_7", "PADDING"). \
>>>>>
>>>>>             withColumn("op_type", lit(1)). \
>>>>>
>>>>>             withColumn("op_time", current_timestamp())
>>>>>
>>>>> Then  you can look at all records that were created and subsequently
>>>>> deleted and at what time
>>>>>
>>>>>
>>>>> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>>>>>
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 27 Jan 2022 at 17:54, Sid Kal <fl...@gmail.com> wrote:
>>>>>
>>>>>> Hi Sean,
>>>>>>
>>>>>> So you mean if I use those file formats it will do the work of CDC
>>>>>> automatically or I would have to handle it via code ?
>>>>>>
>>>>>> Hi Mich,
>>>>>>
>>>>>> Not sure if I understood you. Let me try to explain my scenario.
>>>>>> Suppose there is a Id "1" which is inserted today, so I transformed and
>>>>>> ingested it. Now suppose if this user id is deleted from the source itself.
>>>>>> Then how can I delete it in my transformed db
>>>>>> ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sr...@gmail.com> wrote:
>>>>>>
>>>>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No
>>>>>>> need to manage it manually or use a DBMS. These formats allow deletes,
>>>>>>> upserts, etc of data, using Spark, on cloud storage.
>>>>>>>
>>>>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> Where ETL data is stored?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *But now the main problem is when the record at the source is
>>>>>>>> deleted, it should be deleted in my final transformed record too.*
>>>>>>>>
>>>>>>>>
>>>>>>>> If your final sync (storage) is data warehouse, it should be soft
>>>>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> HTH
>>>>>>>>
>>>>>>>>
>>>>>>>>    view my Linkedin profile
>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I am using Spark incremental approach for bringing the latest data
>>>>>>>>> everyday. Everything works fine.
>>>>>>>>>
>>>>>>>>> But now the main problem is when the record at the source is
>>>>>>>>> deleted, it should be deleted in my final transformed record too.
>>>>>>>>>
>>>>>>>>> How do I capture such changes and change my table too ?
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>> Sid
>>>>>>>>>
>>>>>>>>> --
> Best Regards,
> Ayan Guha
>

Re: How to delete the record

Posted by ayan guha <gu...@gmail.com>.
Btw, 2 options Mitch explained are not mutually exclusive. Option 2 can and
should be implemented over a delta lake table anyway. Especially if you
need to do hard deletes eventually (eg for regulatory needs)



On Fri, 28 Jan 2022 at 6:50 am, Sid Kal <fl...@gmail.com> wrote:

> Thanks Mich and Sean for your time
>
> On Fri, 28 Jan 2022, 00:53 Mich Talebzadeh, <mi...@gmail.com>
> wrote:
>
>> Yes I believe so.
>>
>> Check this article of mine dated early 2019 but will have some relevance
>> to what I am implying.
>>
>>
>> https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 27 Jan 2022 at 18:46, Sid Kal <fl...@gmail.com> wrote:
>>
>>> Okay sounds good.
>>>
>>> So,  below two options would help me to capture CDC changes:
>>>
>>> 1) Delta lake
>>> 2) Maintaining snapshot of records with some indicators and timestamp.
>>>
>>> Correct me if I'm wrong
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mi...@gmail.com>
>>> wrote:
>>>
>>>> There are two ways of doing it.
>>>>
>>>>
>>>>    1. Through snapshot offered meaning an immutable snapshot of the
>>>>    state of the table at a given version. For example, the state
>>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
>>>>    table
>>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>>>>    the version
>>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>>>>    2. Creating your own versioning. Taking your example you define the
>>>>    target storage *with two added columns, namely:* op_type INT
>>>>    (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>.
>>>>    Your example record will be
>>>>
>>>>
>>>> id               op_type      op_time
>>>>
>>>> 1                1             <ingestion_time>
>>>>
>>>> 1                3             <ingestion_time>
>>>>
>>>>
>>>>        df = rdd.toDF(). \
>>>>
>>>>             withColumnRenamed("_1", "ID"). \
>>>>
>>>>             withColumnRenamed("_2", "CLUSTERED"). \
>>>>
>>>>             withColumnRenamed("_3", "SCATTERED"). \
>>>>
>>>>             withColumnRenamed("_4", "RANDOMISED"). \
>>>>
>>>>             withColumnRenamed("_5", "RANDOM_STRING"). \
>>>>
>>>>             withColumnRenamed("_6", "SMALL_VC"). \
>>>>
>>>>             withColumnRenamed("_7", "PADDING"). \
>>>>
>>>>             withColumn("op_type", lit(1)). \
>>>>
>>>>             withColumn("op_time", current_timestamp())
>>>>
>>>> Then  you can look at all records that were created and subsequently
>>>> deleted and at what time
>>>>
>>>>
>>>> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 27 Jan 2022 at 17:54, Sid Kal <fl...@gmail.com> wrote:
>>>>
>>>>> Hi Sean,
>>>>>
>>>>> So you mean if I use those file formats it will do the work of CDC
>>>>> automatically or I would have to handle it via code ?
>>>>>
>>>>> Hi Mich,
>>>>>
>>>>> Not sure if I understood you. Let me try to explain my scenario.
>>>>> Suppose there is a Id "1" which is inserted today, so I transformed and
>>>>> ingested it. Now suppose if this user id is deleted from the source itself.
>>>>> Then how can I delete it in my transformed db
>>>>> ?
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sr...@gmail.com> wrote:
>>>>>
>>>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No
>>>>>> need to manage it manually or use a DBMS. These formats allow deletes,
>>>>>> upserts, etc of data, using Spark, on cloud storage.
>>>>>>
>>>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Where ETL data is stored?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *But now the main problem is when the record at the source is
>>>>>>> deleted, it should be deleted in my final transformed record too.*
>>>>>>>
>>>>>>>
>>>>>>> If your final sync (storage) is data warehouse, it should be soft
>>>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I am using Spark incremental approach for bringing the latest data
>>>>>>>> everyday. Everything works fine.
>>>>>>>>
>>>>>>>> But now the main problem is when the record at the source is
>>>>>>>> deleted, it should be deleted in my final transformed record too.
>>>>>>>>
>>>>>>>> How do I capture such changes and change my table too ?
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Sid
>>>>>>>>
>>>>>>>> --
Best Regards,
Ayan Guha

Re: How to delete the record

Posted by Sid Kal <fl...@gmail.com>.
Thanks Mich and Sean for your time

On Fri, 28 Jan 2022, 00:53 Mich Talebzadeh, <mi...@gmail.com>
wrote:

> Yes I believe so.
>
> Check this article of mine dated early 2019 but will have some relevance
> to what I am implying.
>
>
> https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 18:46, Sid Kal <fl...@gmail.com> wrote:
>
>> Okay sounds good.
>>
>> So,  below two options would help me to capture CDC changes:
>>
>> 1) Delta lake
>> 2) Maintaining snapshot of records with some indicators and timestamp.
>>
>> Correct me if I'm wrong
>>
>> Thanks,
>> Sid
>>
>> On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mi...@gmail.com>
>> wrote:
>>
>>> There are two ways of doing it.
>>>
>>>
>>>    1. Through snapshot offered meaning an immutable snapshot of the
>>>    state of the table at a given version. For example, the state
>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
>>>    table
>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>>>    the version
>>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>>>    2. Creating your own versioning. Taking your example you define the
>>>    target storage *with two added columns, namely:* op_type INT
>>>    (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>.
>>>    Your example record will be
>>>
>>>
>>> id               op_type      op_time
>>>
>>> 1                1             <ingestion_time>
>>>
>>> 1                3             <ingestion_time>
>>>
>>>
>>>        df = rdd.toDF(). \
>>>
>>>             withColumnRenamed("_1", "ID"). \
>>>
>>>             withColumnRenamed("_2", "CLUSTERED"). \
>>>
>>>             withColumnRenamed("_3", "SCATTERED"). \
>>>
>>>             withColumnRenamed("_4", "RANDOMISED"). \
>>>
>>>             withColumnRenamed("_5", "RANDOM_STRING"). \
>>>
>>>             withColumnRenamed("_6", "SMALL_VC"). \
>>>
>>>             withColumnRenamed("_7", "PADDING"). \
>>>
>>>             withColumn("op_type", lit(1)). \
>>>
>>>             withColumn("op_time", current_timestamp())
>>>
>>> Then  you can look at all records that were created and subsequently
>>> deleted and at what time
>>>
>>>
>>> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 17:54, Sid Kal <fl...@gmail.com> wrote:
>>>
>>>> Hi Sean,
>>>>
>>>> So you mean if I use those file formats it will do the work of CDC
>>>> automatically or I would have to handle it via code ?
>>>>
>>>> Hi Mich,
>>>>
>>>> Not sure if I understood you. Let me try to explain my scenario.
>>>> Suppose there is a Id "1" which is inserted today, so I transformed and
>>>> ingested it. Now suppose if this user id is deleted from the source itself.
>>>> Then how can I delete it in my transformed db
>>>> ?
>>>>
>>>>
>>>>
>>>> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sr...@gmail.com> wrote:
>>>>
>>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No
>>>>> need to manage it manually or use a DBMS. These formats allow deletes,
>>>>> upserts, etc of data, using Spark, on cloud storage.
>>>>>
>>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Where ETL data is stored?
>>>>>>
>>>>>>
>>>>>>
>>>>>> *But now the main problem is when the record at the source is
>>>>>> deleted, it should be deleted in my final transformed record too.*
>>>>>>
>>>>>>
>>>>>> If your final sync (storage) is data warehouse, it should be soft
>>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>>>
>>>>>>
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>>>>>>
>>>>>>> I am using Spark incremental approach for bringing the latest data
>>>>>>> everyday. Everything works fine.
>>>>>>>
>>>>>>> But now the main problem is when the record at the source is
>>>>>>> deleted, it should be deleted in my final transformed record too.
>>>>>>>
>>>>>>> How do I capture such changes and change my table too ?
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Sid
>>>>>>>
>>>>>>>

Re: How to delete the record

Posted by Mich Talebzadeh <mi...@gmail.com>.
Yes I believe so.

Check this article of mine dated early 2019 but will have some relevance to
what I am implying.

https://www.linkedin.com/pulse/real-time-data-streaming-big-typical-use-cases-talebzadeh-ph-d-/

HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Jan 2022 at 18:46, Sid Kal <fl...@gmail.com> wrote:

> Okay sounds good.
>
> So,  below two options would help me to capture CDC changes:
>
> 1) Delta lake
> 2) Maintaining snapshot of records with some indicators and timestamp.
>
> Correct me if I'm wrong
>
> Thanks,
> Sid
>
> On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mi...@gmail.com>
> wrote:
>
>> There are two ways of doing it.
>>
>>
>>    1. Through snapshot offered meaning an immutable snapshot of the
>>    state of the table at a given version. For example, the state
>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
>>    table
>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>>    the version
>>    <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>>    2. Creating your own versioning. Taking your example you define the
>>    target storage *with two added columns, namely:* op_type INT
>>    (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>.
>>    Your example record will be
>>
>>
>> id               op_type      op_time
>>
>> 1                1             <ingestion_time>
>>
>> 1                3             <ingestion_time>
>>
>>
>>        df = rdd.toDF(). \
>>
>>             withColumnRenamed("_1", "ID"). \
>>
>>             withColumnRenamed("_2", "CLUSTERED"). \
>>
>>             withColumnRenamed("_3", "SCATTERED"). \
>>
>>             withColumnRenamed("_4", "RANDOMISED"). \
>>
>>             withColumnRenamed("_5", "RANDOM_STRING"). \
>>
>>             withColumnRenamed("_6", "SMALL_VC"). \
>>
>>             withColumnRenamed("_7", "PADDING"). \
>>
>>             withColumn("op_type", lit(1)). \
>>
>>             withColumn("op_time", current_timestamp())
>>
>> Then  you can look at all records that were created and subsequently
>> deleted and at what time
>>
>>
>> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 27 Jan 2022 at 17:54, Sid Kal <fl...@gmail.com> wrote:
>>
>>> Hi Sean,
>>>
>>> So you mean if I use those file formats it will do the work of CDC
>>> automatically or I would have to handle it via code ?
>>>
>>> Hi Mich,
>>>
>>> Not sure if I understood you. Let me try to explain my scenario. Suppose
>>> there is a Id "1" which is inserted today, so I transformed and ingested
>>> it. Now suppose if this user id is deleted from the source itself. Then how
>>> can I delete it in my transformed db
>>> ?
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sr...@gmail.com> wrote:
>>>
>>>> This is what storage engines like Delta, Hudi, Iceberg are for. No need
>>>> to manage it manually or use a DBMS. These formats allow deletes, upserts,
>>>> etc of data, using Spark, on cloud storage.
>>>>
>>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Where ETL data is stored?
>>>>>
>>>>>
>>>>>
>>>>> *But now the main problem is when the record at the source is deleted,
>>>>> it should be deleted in my final transformed record too.*
>>>>>
>>>>>
>>>>> If your final sync (storage) is data warehouse, it should be soft
>>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>>
>>>>>
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>>>>>
>>>>>> I am using Spark incremental approach for bringing the latest data
>>>>>> everyday. Everything works fine.
>>>>>>
>>>>>> But now the main problem is when the record at the source is deleted,
>>>>>> it should be deleted in my final transformed record too.
>>>>>>
>>>>>> How do I capture such changes and change my table too ?
>>>>>>
>>>>>> Best regards,
>>>>>> Sid
>>>>>>
>>>>>>

Re: How to delete the record

Posted by Sid Kal <fl...@gmail.com>.
Okay sounds good.

So,  below two options would help me to capture CDC changes:

1) Delta lake
2) Maintaining snapshot of records with some indicators and timestamp.

Correct me if I'm wrong

Thanks,
Sid

On Thu, 27 Jan 2022, 23:59 Mich Talebzadeh, <mi...@gmail.com>
wrote:

> There are two ways of doing it.
>
>
>    1. Through snapshot offered meaning an immutable snapshot of the state
>    of the table at a given version. For example, the state
>    <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
>    table <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
>    the version
>    <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
>    2. Creating your own versioning. Taking your example you define the
>    target storage *with two added columns, namely:* op_type INT
>    (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>.
>    Your example record will be
>
>
> id               op_type      op_time
>
> 1                1             <ingestion_time>
>
> 1                3             <ingestion_time>
>
>
>        df = rdd.toDF(). \
>
>             withColumnRenamed("_1", "ID"). \
>
>             withColumnRenamed("_2", "CLUSTERED"). \
>
>             withColumnRenamed("_3", "SCATTERED"). \
>
>             withColumnRenamed("_4", "RANDOMISED"). \
>
>             withColumnRenamed("_5", "RANDOM_STRING"). \
>
>             withColumnRenamed("_6", "SMALL_VC"). \
>
>             withColumnRenamed("_7", "PADDING"). \
>
>             withColumn("op_type", lit(1)). \
>
>             withColumn("op_time", current_timestamp())
>
> Then  you can look at all records that were created and subsequently
> deleted and at what time
>
>
> SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 17:54, Sid Kal <fl...@gmail.com> wrote:
>
>> Hi Sean,
>>
>> So you mean if I use those file formats it will do the work of CDC
>> automatically or I would have to handle it via code ?
>>
>> Hi Mich,
>>
>> Not sure if I understood you. Let me try to explain my scenario. Suppose
>> there is a Id "1" which is inserted today, so I transformed and ingested
>> it. Now suppose if this user id is deleted from the source itself. Then how
>> can I delete it in my transformed db
>> ?
>>
>>
>>
>> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sr...@gmail.com> wrote:
>>
>>> This is what storage engines like Delta, Hudi, Iceberg are for. No need
>>> to manage it manually or use a DBMS. These formats allow deletes, upserts,
>>> etc of data, using Spark, on cloud storage.
>>>
>>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Where ETL data is stored?
>>>>
>>>>
>>>>
>>>> *But now the main problem is when the record at the source is deleted,
>>>> it should be deleted in my final transformed record too.*
>>>>
>>>>
>>>> If your final sync (storage) is data warehouse, it should be soft
>>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>>
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>>>>
>>>>> I am using Spark incremental approach for bringing the latest data
>>>>> everyday. Everything works fine.
>>>>>
>>>>> But now the main problem is when the record at the source is deleted,
>>>>> it should be deleted in my final transformed record too.
>>>>>
>>>>> How do I capture such changes and change my table too ?
>>>>>
>>>>> Best regards,
>>>>> Sid
>>>>>
>>>>>

Re: How to delete the record

Posted by Mich Talebzadeh <mi...@gmail.com>.
There are two ways of doing it.


   1. Through snapshot offered meaning an immutable snapshot of the state
   of the table at a given version. For example, the state
   <https://books.japila.pl/delta-lake-internals/Snapshot/#state> of a Delta
   table <https://books.japila.pl/delta-lake-internals/Snapshot/#deltaLog> at
   the version
   <https://books.japila.pl/delta-lake-internals/Snapshot/#version>.
   2. Creating your own versioning. Taking your example you define the
   target storage *with two added columns, namely:* op_type INT
   (1-inset,2-update,3-delete) and op_timeTIMESTAMP <as of ingestion_time>.
   Your example record will be


id               op_type      op_time

1                1             <ingestion_time>

1                3             <ingestion_time>


       df = rdd.toDF(). \

            withColumnRenamed("_1", "ID"). \

            withColumnRenamed("_2", "CLUSTERED"). \

            withColumnRenamed("_3", "SCATTERED"). \

            withColumnRenamed("_4", "RANDOMISED"). \

            withColumnRenamed("_5", "RANDOM_STRING"). \

            withColumnRenamed("_6", "SMALL_VC"). \

            withColumnRenamed("_7", "PADDING"). \

            withColumn("op_type", lit(1)). \

            withColumn("op_time", current_timestamp())

Then  you can look at all records that were created and subsequently
deleted and at what time


SELECT ID, op_time FROM my_table> WHERE op_type in (1,3)


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Jan 2022 at 17:54, Sid Kal <fl...@gmail.com> wrote:

> Hi Sean,
>
> So you mean if I use those file formats it will do the work of CDC
> automatically or I would have to handle it via code ?
>
> Hi Mich,
>
> Not sure if I understood you. Let me try to explain my scenario. Suppose
> there is a Id "1" which is inserted today, so I transformed and ingested
> it. Now suppose if this user id is deleted from the source itself. Then how
> can I delete it in my transformed db
> ?
>
>
>
> On Thu, 27 Jan 2022, 22:44 Sean Owen, <sr...@gmail.com> wrote:
>
>> This is what storage engines like Delta, Hudi, Iceberg are for. No need
>> to manage it manually or use a DBMS. These formats allow deletes, upserts,
>> etc of data, using Spark, on cloud storage.
>>
>> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Where ETL data is stored?
>>>
>>>
>>>
>>> *But now the main problem is when the record at the source is deleted,
>>> it should be deleted in my final transformed record too.*
>>>
>>>
>>> If your final sync (storage) is data warehouse, it should be soft
>>> flagged with op_type (Insert/Update/Delete) and op_time (timestamp).
>>>
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>>>
>>>> I am using Spark incremental approach for bringing the latest data
>>>> everyday. Everything works fine.
>>>>
>>>> But now the main problem is when the record at the source is deleted,
>>>> it should be deleted in my final transformed record too.
>>>>
>>>> How do I capture such changes and change my table too ?
>>>>
>>>> Best regards,
>>>> Sid
>>>>
>>>>

Re: How to delete the record

Posted by Sid Kal <fl...@gmail.com>.
Hi Sean,

So you mean if I use those file formats it will do the work of CDC
automatically or I would have to handle it via code ?

Hi Mich,

Not sure if I understood you. Let me try to explain my scenario. Suppose
there is a Id "1" which is inserted today, so I transformed and ingested
it. Now suppose if this user id is deleted from the source itself. Then how
can I delete it in my transformed db
?



On Thu, 27 Jan 2022, 22:44 Sean Owen, <sr...@gmail.com> wrote:

> This is what storage engines like Delta, Hudi, Iceberg are for. No need to
> manage it manually or use a DBMS. These formats allow deletes, upserts, etc
> of data, using Spark, on cloud storage.
>
> On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> Where ETL data is stored?
>>
>>
>>
>> *But now the main problem is when the record at the source is deleted, it
>> should be deleted in my final transformed record too.*
>>
>>
>> If your final sync (storage) is data warehouse, it should be soft flagged
>> with op_type (Insert/Update/Delete) and op_time (timestamp).
>>
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>>
>>> I am using Spark incremental approach for bringing the latest data
>>> everyday. Everything works fine.
>>>
>>> But now the main problem is when the record at the source is deleted, it
>>> should be deleted in my final transformed record too.
>>>
>>> How do I capture such changes and change my table too ?
>>>
>>> Best regards,
>>> Sid
>>>
>>>

Re: How to delete the record

Posted by Sean Owen <sr...@gmail.com>.
This is what storage engines like Delta, Hudi, Iceberg are for. No need to
manage it manually or use a DBMS. These formats allow deletes, upserts, etc
of data, using Spark, on cloud storage.

On Thu, Jan 27, 2022 at 10:56 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Where ETL data is stored?
>
>
>
> *But now the main problem is when the record at the source is deleted, it
> should be deleted in my final transformed record too.*
>
>
> If your final sync (storage) is data warehouse, it should be soft flagged
> with op_type (Insert/Update/Delete) and op_time (timestamp).
>
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>
>> I am using Spark incremental approach for bringing the latest data
>> everyday. Everything works fine.
>>
>> But now the main problem is when the record at the source is deleted, it
>> should be deleted in my final transformed record too.
>>
>> How do I capture such changes and change my table too ?
>>
>> Best regards,
>> Sid
>>
>>

Re: How to delete the record

Posted by Mich Talebzadeh <mi...@gmail.com>.
Sid,

How do you cater for updates? Do you add it as an update with a new record
without touching the original record? This approach allows you to see the
history of the records i.e. inserted once, deleted once and updated *n*
times throughout the Entity Life History of record.

So your mileage varies depending on what you want to do, maintaining the
history of the event (An event is a change in state, or an update of some
key business system or row in your case). An event-driven architecture (EDA)
consists of an event producer (your source), an event consumer (your final
storage, a DB or storage bucket), and an event broker (your Spark engine)


HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Jan 2022 at 16:59, Sid Kal <fl...@gmail.com> wrote:

> Hi Mich,
>
> Thanks for your time.
>
> Data is stored in S3 via DMS which is read in the Spark jobs.
>
> How can I mark as a soft delete ?
>
> Any small snippet / link / example. Anything would help.
>
> Thanks,
> Sid
>
> On Thu, 27 Jan 2022, 22:26 Mich Talebzadeh, <mi...@gmail.com>
> wrote:
>
>> Where ETL data is stored?
>>
>>
>>
>> *But now the main problem is when the record at the source is deleted, it
>> should be deleted in my final transformed record too.*
>>
>>
>> If your final sync (storage) is data warehouse, it should be soft flagged
>> with op_type (Insert/Update/Delete) and op_time (timestamp).
>>
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>>
>>> I am using Spark incremental approach for bringing the latest data
>>> everyday. Everything works fine.
>>>
>>> But now the main problem is when the record at the source is deleted, it
>>> should be deleted in my final transformed record too.
>>>
>>> How do I capture such changes and change my table too ?
>>>
>>> Best regards,
>>> Sid
>>>
>>>

Re: How to delete the record

Posted by Sid Kal <fl...@gmail.com>.
Hi Mich,

Thanks for your time.

Data is stored in S3 via DMS which is read in the Spark jobs.

How can I mark as a soft delete ?

Any small snippet / link / example. Anything would help.

Thanks,
Sid

On Thu, 27 Jan 2022, 22:26 Mich Talebzadeh, <mi...@gmail.com>
wrote:

> Where ETL data is stored?
>
>
>
> *But now the main problem is when the record at the source is deleted, it
> should be deleted in my final transformed record too.*
>
>
> If your final sync (storage) is data warehouse, it should be soft flagged
> with op_type (Insert/Update/Delete) and op_time (timestamp).
>
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:
>
>> I am using Spark incremental approach for bringing the latest data
>> everyday. Everything works fine.
>>
>> But now the main problem is when the record at the source is deleted, it
>> should be deleted in my final transformed record too.
>>
>> How do I capture such changes and change my table too ?
>>
>> Best regards,
>> Sid
>>
>>

Re: How to delete the record

Posted by Mich Talebzadeh <mi...@gmail.com>.
Where ETL data is stored?



*But now the main problem is when the record at the source is deleted, it
should be deleted in my final transformed record too.*


If your final sync (storage) is data warehouse, it should be soft flagged
with op_type (Insert/Update/Delete) and op_time (timestamp).



HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Jan 2022 at 15:48, Sid Kal <fl...@gmail.com> wrote:

> I am using Spark incremental approach for bringing the latest data
> everyday. Everything works fine.
>
> But now the main problem is when the record at the source is deleted, it
> should be deleted in my final transformed record too.
>
> How do I capture such changes and change my table too ?
>
> Best regards,
> Sid
>
>