You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Erik Wright <er...@shopify.com.INVALID> on 2018/11/27 20:03:12 UTC

Re: Status of Spark Integration, Questions

>
> *Upserts/Deletes*
>>
>> I have jobs that apply upserts/deletes to datasets. My current approach
>> is:
>>
>>    1. calculate the affected partitions (collected in the Driver)
>>    2. load up the previous versions of all of those partitions as a
>>    DataFrame
>>    3. apply the upserts/deletes
>>    4. write out the new versions of the affected partitions (containing
>>    old data plus/minus the upserts/deletes)
>>    5. update my index file
>>
>> How is this intended to be done in Iceberg? I see that there are a bunch
>> of Table operations. Would it be up to me to still do steps 1-4 and then
>> rely on Iceberg to do step 5 using the table operations?
>>
>
> Currently, you can delete data by reading, filtering, and overwriting what
> you read. That's an atomic operation so it is safe to read and overwrite
> the same data.
>

Does such an operation take advantage of partitioning to minimize write
amplification? For example, let's say I do something like this:

path = 'some_path'
df = read(path)
df = (
    df
    .join(keys_to_delete, on=['partition_col'], how='anti')
    .union(upserts)
)
df.write(path)

Is this going to result in scanning and rewriting the entire dataset even
if the keys to delete are in a small subset of partitions? I would imagine
so. What is the correct way to do this?

Re: Status of Spark Integration, Questions

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
This depends on how you write the data. If you read and then overwrite what
you read, then it would work and be reasonably efficient. Iceberg supports
this.

On the other hand, if you read and then append, then you’ll get duplicates
and won’t remove any rows. So you have to choose the right write semantics.

We plan to make overwrites explicit in Spark to make it clear what is
happening. That would look like this:

df = spark.table("t").filter($"ts".cast(DateType) == "2018-11-01")
// filter dfdf.write.to("t").overwrite($"ts".cast(DateType) == "2018-11-01")

That way you would use the same filter to load and overwrite data.

On Tue, Nov 27, 2018 at 12:03 PM Erik Wright <er...@shopify.com>
wrote:

> *Upserts/Deletes*
>>>
>>> I have jobs that apply upserts/deletes to datasets. My current approach
>>> is:
>>>
>>>    1. calculate the affected partitions (collected in the Driver)
>>>    2. load up the previous versions of all of those partitions as a
>>>    DataFrame
>>>    3. apply the upserts/deletes
>>>    4. write out the new versions of the affected partitions (containing
>>>    old data plus/minus the upserts/deletes)
>>>    5. update my index file
>>>
>>> How is this intended to be done in Iceberg? I see that there are a bunch
>>> of Table operations. Would it be up to me to still do steps 1-4 and then
>>> rely on Iceberg to do step 5 using the table operations?
>>>
>>
>> Currently, you can delete data by reading, filtering, and overwriting
>> what you read. That's an atomic operation so it is safe to read and
>> overwrite the same data.
>>
>
> Does such an operation take advantage of partitioning to minimize write
> amplification? For example, let's say I do something like this:
>
> path = 'some_path'
> df = read(path)
> df = (
>     df
>     .join(keys_to_delete, on=['partition_col'], how='anti')
>     .union(upserts)
> )
> df.write(path)
>
> Is this going to result in scanning and rewriting the entire dataset even
> if the keys to delete are in a small subset of partitions? I would imagine
> so. What is the correct way to do this?
>


-- 
Ryan Blue
Software Engineer
Netflix