You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jungtaek Lim <ka...@gmail.com> on 2019/02/26 06:12:51 UTC

[SS] Allowing stream Sink metadata as part of checkpoint?

Hi devs,

I was about to give it a try, but it would relate to DSv2 so decide to
initiate new thread before actual work. I also don't think this should be
along with DSv2 discussion since the change would be minor.

While dealing with SPARK-24295 [1] and SPARK-26411 [2], I feel the needs of
participating sink metadata into checkpoint directory, but unlike source
which metadata directory is provided as subdirectory of checkpoint
directory, sink doesn't receive its own metadata directory.

For example, FileStreamSink creates metadata directory on output directory
- though it is a bit intentional to share between queries - but sometimes
we may want to make it coupled with query checkpoint.

What do you think about passing metadata path to sink (we have only one for
query) so that sink metadata can be coupled with query checkpoint?

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-24295
2. https://issues.apache.org/jira/browse/SPARK-26411

Re: [SS] Allowing stream Sink metadata as part of checkpoint?

Posted by Jungtaek Lim <ka...@gmail.com>.
I understand the reason about storing information along with data for
transactional committing, but it mostly makes sense if we store outputs
along with all necessary checkpoint information via transactional manner.
Spark doesn't store query checkpoint along with outputs.

I feel this is regarding who is owner of sink metadata - query, or sink
output. If metadata contains batch id and being used for deduplication, it
seems to be more coupled with query, and it will be a disaster to co-use
metadata in other query, or when checkpoint and sink metadata doesn't sync.

One example is SPARK-26411 [1] as I mentioned - if we remove checkpoint as
suggested and rerun the query with same sink output then data loss happens.
Deduplication is based on the fact the output of each batch will be always
same, but when we remove checkpoint the fact is simply broken.

I'm also seeing the other case sink output should be owner of sink
metadata, FileStreamSink, but due to the nature of this, metadata can't be
purged (as query cannot purge it - don't know which queries are also
accessing) and some of end users who suffered on growing metadata reported
to the issue SPARK-24295 [2].

So I'm considering both things together and finding a way to deal with them.

-Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-26411
2. https://issues.apache.org/jira/browse/SPARK-24295

2019년 2월 26일 (화) 오후 4:28, Arun Mahadevan <ar...@apache.org>님이 작성:

> Unless its some sink metadata to be maintained by the framework (e.g sink
> state that needs to be passed back to the sink etc), would it make sense
> to keep it under the checkpoint dir ?
>
> Maybe I am missing the motivation of the proposed approach but I guess
> the sink mostly needs to store the last seen batchId to discard duplicate
> data during a batch replay. It would be ideal
> for the sink to store this information in the external store (along with
> the data) for de-duplication to work correctly.
>
> Thanks,
> Arun
>
>
>
> On Mon, 25 Feb 2019 at 22:13, Jungtaek Lim <ka...@gmail.com> wrote:
>
>> Hi devs,
>>
>> I was about to give it a try, but it would relate to DSv2 so decide to
>> initiate new thread before actual work. I also don't think this should be
>> along with DSv2 discussion since the change would be minor.
>>
>> While dealing with SPARK-24295 [1] and SPARK-26411 [2], I feel the needs
>> of participating sink metadata into checkpoint directory, but unlike source
>> which metadata directory is provided as subdirectory of checkpoint
>> directory, sink doesn't receive its own metadata directory.
>>
>> For example, FileStreamSink creates metadata directory on output
>> directory - though it is a bit intentional to share between queries - but
>> sometimes we may want to make it coupled with query checkpoint.
>>
>> What do you think about passing metadata path to sink (we have only one
>> for query) so that sink metadata can be coupled with query checkpoint?
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-24295
>> 2. https://issues.apache.org/jira/browse/SPARK-26411
>>
>>

Re: [SS] Allowing stream Sink metadata as part of checkpoint?

Posted by Arun Mahadevan <ar...@apache.org>.
Unless its some sink metadata to be maintained by the framework (e.g sink
state that needs to be passed back to the sink etc), would it make sense
to keep it under the checkpoint dir ?

Maybe I am missing the motivation of the proposed approach but I guess
the sink mostly needs to store the last seen batchId to discard duplicate
data during a batch replay. It would be ideal
for the sink to store this information in the external store (along with
the data) for de-duplication to work correctly.

Thanks,
Arun



On Mon, 25 Feb 2019 at 22:13, Jungtaek Lim <ka...@gmail.com> wrote:

> Hi devs,
>
> I was about to give it a try, but it would relate to DSv2 so decide to
> initiate new thread before actual work. I also don't think this should be
> along with DSv2 discussion since the change would be minor.
>
> While dealing with SPARK-24295 [1] and SPARK-26411 [2], I feel the needs
> of participating sink metadata into checkpoint directory, but unlike source
> which metadata directory is provided as subdirectory of checkpoint
> directory, sink doesn't receive its own metadata directory.
>
> For example, FileStreamSink creates metadata directory on output directory
> - though it is a bit intentional to share between queries - but sometimes
> we may want to make it coupled with query checkpoint.
>
> What do you think about passing metadata path to sink (we have only one
> for query) so that sink metadata can be coupled with query checkpoint?
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1. https://issues.apache.org/jira/browse/SPARK-24295
> 2. https://issues.apache.org/jira/browse/SPARK-26411
>
>