You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Thakrar, Jayesh" <jt...@conversantmedia.com> on 2018/04/24 02:49:59 UTC

Datasource API V2 and checkpointing

I was wondering when checkpointing is enabled, who does the actual work?
The streaming datasource or the execution engine/driver?

I have written a small/trivial datasource that just generates strings.
After enabling checkpointing, I do see a folder being created under the checkpoint folder, but there's nothing else in there.

Same question for write-ahead and recovery?
And on a restart from a failed streaming session - who should set the offsets?
The driver/Spark or the datasource?

Any pointers to design docs would also be greatly appreciated.

Thanks,
Jayesh


Re: Datasource API V2 and checkpointing

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
Just wondering-

Given that currently V2 is less performant because of use of Row vs InternalRow (and other things?), is still evolving, and is missing some of the other features of V1, it might help to focus on remediating those features and then look at porting the filesources over.

As for the escape hatch (or additional capabilities), can that be implemented as traits?

And imho, i think filesources and other core sources should have the same citizenship level as us granted to the other sources in V2. This is so that others can use then as good references for emulation.

Jayesh

________________________________
From: Joseph Torres <jo...@databricks.com>
Sent: Tuesday, May 1, 2018 1:58:54 PM
To: Ryan Blue
Cc: Thakrar, Jayesh; dev@spark.apache.org
Subject: Re: Datasource API V2 and checkpointing

I agree that Spark should fully handle state serialization and recovery for most sources. This is how it works in V1, and we definitely wouldn't want or need to change that in V2.* The question is just whether we should have an escape hatch for the sources that don't want Spark to do that, and if so what the escape hatch should look like.

I don't think a watermark checkpoint would work, because there's no guarantee (especially considering the "maxFilesPerTrigger" option) that all files with the same timestamp will be in the same batch. But in general, hanging the fundamental mechanics of how file sources take checkpoints seems like it would impose a serious risk of performance regressions, which I don't think are a desirable risk when performing an API migration that's going to swap out users' queries from under them. I would be very uncomfortable merging a V2 file source which we can't confidently assert has the same performance characteristics as the existing one.


* Technically, most current sources do write their initial offset to the checkpoint directory, but this is just a workaround to the fact that the V1 API has no handle to give Spark the initial offset. So if you e.g. start a Kafka stream from latest offsets, and it fails in the first batch, Spark won't know to restart the stream from the initial offset which was originally generated. That's easily fixable in V2, and then no source will have to even look at the checkpoint directory if it doesn't want to.

On Tue, May 1, 2018 at 10:26 AM, Ryan Blue <rb...@netflix.com>> wrote:
I think there's a difference. You're right that we wanted to clean up the API in V2 to avoid file sources using side channels. But there's a big difference between adding, for example, a way to report partitioning and designing for sources that need unbounded state. It's a judgment call, but I think unbounded state is definitely not something that we should design around. Another way to think about it: yes, we want to design a better API using existing sources as guides, but we don't need to assume that everything those sources do should to be supported. It is reasonable to say that this is a case we don't want to design for and the source needs to change. Why can't we use a high watermark of files' modified timestamps?

For most sources, I think Spark should handle state serialization and recovery. Maybe we can find a good way to make the file source with unbounded state work, but this shouldn't be one of the driving cases for the design and consequently a reason for every source to need to manage its own state in a checkpoint directory.

rb

On Mon, Apr 30, 2018 at 12:37 PM, Joseph Torres <jo...@databricks.com>> wrote:
I'd argue that letting bad cases influence the design is an explicit goal of DataSourceV2. One of the primary motivations for the project was that file sources hook into a series of weird internal side channels, with favorable performance characteristics that are difficult to match in the API we actually declare to Spark users. So a design that we can't migrate file sources to without a side channel would be worrying; won't we end up regressing to the same situation?

On Mon, Apr 30, 2018 at 11:59 AM, Ryan Blue <rb...@netflix.com>> wrote:
Should we really plan the API for a source with state that grows indefinitely? It sounds like we're letting a bad case influence the design, when we probably shouldn't.

On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres <jo...@databricks.com>> wrote:
Offset is just a type alias for arbitrary JSON-serializable state. Most implementations should (and do) just toss the blob at Spark and let Spark handle recovery on its own.

In the case of file streams, the obstacle is that the conceptual offset is very large: a list of every file which the stream has ever read. In order to parse this efficiently, the stream connector needs detailed control over how it's stored; the current implementation even has complex compactification and retention logic.


On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com>> wrote:
Why don't we just have the source return a Serializable of state when it reports offsets? Then Spark could handle storing the source's state and the source wouldn't need to worry about file system paths. I think that would be easier for implementations and better for recovery because it wouldn't leave unknown state on a single machine's file system.

rb

On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <jo...@databricks.com>> wrote:
The precise interactions with the DataSourceV2 API haven't yet been hammered out in design. But much of this comes down to the core of Structured Streaming rather than the API details.

The execution engine handles checkpointing and recovery. It asks the streaming data source for offsets, and then determines that batch N contains the data between offset A and offset B. On recovery, if batch N needs to be re-run, the execution engine just asks the source for the same offset range again. Sources also get a handle to their own subfolder of the checkpoint, which they can use as scratch space if they need. For example, Spark's FileStreamReader keeps a log of all the files it's seen, so its offsets can be simply indices into the log rather than huge strings containing all the paths.

SPARK-23323 is orthogonal. That commit coordinator is responsible for ensuring that, within a single Spark job, two different tasks can't commit the same partition.

On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Wondering if this issue is related to SPARK-23323?

Any pointers will be greatly appreciated….

Thanks,
Jayesh

From: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Date: Monday, April 23, 2018 at 9:49 PM
To: "dev@spark.apache.org<ma...@spark.apache.org>" <de...@spark.apache.org>>
Subject: Datasource API V2 and checkpointing

I was wondering when checkpointing is enabled, who does the actual work?
The streaming datasource or the execution engine/driver?

I have written a small/trivial datasource that just generates strings.
After enabling checkpointing, I do see a folder being created under the checkpoint folder, but there's nothing else in there.

Same question for write-ahead and recovery?
And on a restart from a failed streaming session - who should set the offsets?
The driver/Spark or the datasource?

Any pointers to design docs would also be greatly appreciated.

Thanks,
Jayesh





--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix




--
Ryan Blue
Software Engineer
Netflix


Re: Datasource API V2 and checkpointing

Posted by Joseph Torres <jo...@databricks.com>.
I agree that Spark should fully handle state serialization and recovery for
most sources. This is how it works in V1, and we definitely wouldn't want
or need to change that in V2.* The question is just whether we should have
an escape hatch for the sources that don't want Spark to do that, and if so
what the escape hatch should look like.

I don't think a watermark checkpoint would work, because there's no
guarantee (especially considering the "maxFilesPerTrigger" option) that all
files with the same timestamp will be in the same batch. But in general,
hanging the fundamental mechanics of how file sources take checkpoints
seems like it would impose a serious risk of performance regressions, which
I don't think are a desirable risk when performing an API migration that's
going to swap out users' queries from under them. I would be very
uncomfortable merging a V2 file source which we can't confidently assert
has the same performance characteristics as the existing one.


* Technically, most current sources do write their initial offset to the
checkpoint directory, but this is just a workaround to the fact that the V1
API has no handle to give Spark the initial offset. So if you e.g. start a
Kafka stream from latest offsets, and it fails in the first batch, Spark
won't know to restart the stream from the initial offset which was
originally generated. That's easily fixable in V2, and then no source will
have to even look at the checkpoint directory if it doesn't want to.

On Tue, May 1, 2018 at 10:26 AM, Ryan Blue <rb...@netflix.com> wrote:

> I think there's a difference. You're right that we wanted to clean up the
> API in V2 to avoid file sources using side channels. But there's a big
> difference between adding, for example, a way to report partitioning and
> designing for sources that need unbounded state. It's a judgment call, but
> I think unbounded state is definitely not something that we should design
> around. Another way to think about it: yes, we want to design a better API
> using existing sources as guides, but we don't need to assume that
> everything those sources do should to be supported. It is reasonable to say
> that this is a case we don't want to design for and the source needs to
> change. Why can't we use a high watermark of files' modified timestamps?
>
> For most sources, I think Spark should handle state serialization and
> recovery. Maybe we can find a good way to make the file source with
> unbounded state work, but this shouldn't be one of the driving cases for
> the design and consequently a reason for every source to need to manage its
> own state in a checkpoint directory.
>
> rb
>
> On Mon, Apr 30, 2018 at 12:37 PM, Joseph Torres <
> joseph.torres@databricks.com> wrote:
>
>> I'd argue that letting bad cases influence the design is an explicit goal
>> of DataSourceV2. One of the primary motivations for the project was that
>> file sources hook into a series of weird internal side channels, with
>> favorable performance characteristics that are difficult to match in the
>> API we actually declare to Spark users. So a design that we can't migrate
>> file sources to without a side channel would be worrying; won't we end up
>> regressing to the same situation?
>>
>> On Mon, Apr 30, 2018 at 11:59 AM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Should we really plan the API for a source with state that grows
>>> indefinitely? It sounds like we're letting a bad case influence the
>>> design, when we probably shouldn't.
>>>
>>> On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres <
>>> joseph.torres@databricks.com> wrote:
>>>
>>>> Offset is just a type alias for arbitrary JSON-serializable state. Most
>>>> implementations should (and do) just toss the blob at Spark and let Spark
>>>> handle recovery on its own.
>>>>
>>>> In the case of file streams, the obstacle is that the conceptual offset
>>>> is very large: a list of every file which the stream has ever read. In
>>>> order to parse this efficiently, the stream connector needs detailed
>>>> control over how it's stored; the current implementation even has complex
>>>> compactification and retention logic.
>>>>
>>>>
>>>> On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> Why don't we just have the source return a Serializable of state when
>>>>> it reports offsets? Then Spark could handle storing the source's state and
>>>>> the source wouldn't need to worry about file system paths. I think that
>>>>> would be easier for implementations and better for recovery because it
>>>>> wouldn't leave unknown state on a single machine's file system.
>>>>>
>>>>> rb
>>>>>
>>>>> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
>>>>> joseph.torres@databricks.com> wrote:
>>>>>
>>>>>> The precise interactions with the DataSourceV2 API haven't yet been
>>>>>> hammered out in design. But much of this comes down to the core of
>>>>>> Structured Streaming rather than the API details.
>>>>>>
>>>>>> The execution engine handles checkpointing and recovery. It asks the
>>>>>> streaming data source for offsets, and then determines that batch N
>>>>>> contains the data between offset A and offset B. On recovery, if batch N
>>>>>> needs to be re-run, the execution engine just asks the source for the same
>>>>>> offset range again. Sources also get a handle to their own subfolder of the
>>>>>> checkpoint, which they can use as scratch space if they need. For example,
>>>>>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>>>>>> offsets can be simply indices into the log rather than huge strings
>>>>>> containing all the paths.
>>>>>>
>>>>>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>>>>>> ensuring that, within a single Spark job, two different tasks can't commit
>>>>>> the same partition.
>>>>>>
>>>>>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>>>>>> jthakrar@conversantmedia.com> wrote:
>>>>>>
>>>>>>> Wondering if this issue is related to SPARK-23323?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Any pointers will be greatly appreciated….
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jayesh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
>>>>>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>>>>>> *To: *"dev@spark.apache.org" <de...@spark.apache.org>
>>>>>>> *Subject: *Datasource API V2 and checkpointing
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I was wondering when checkpointing is enabled, who does the actual
>>>>>>> work?
>>>>>>>
>>>>>>> The streaming datasource or the execution engine/driver?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I have written a small/trivial datasource that just generates
>>>>>>> strings.
>>>>>>>
>>>>>>> After enabling checkpointing, I do see a folder being created under
>>>>>>> the checkpoint folder, but there's nothing else in there.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Same question for write-ahead and recovery?
>>>>>>>
>>>>>>> And on a restart from a failed streaming session - who should set
>>>>>>> the offsets?
>>>>>>>
>>>>>>> The driver/Spark or the datasource?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Any pointers to design docs would also be greatly appreciated.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jayesh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Datasource API V2 and checkpointing

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I think there's a difference. You're right that we wanted to clean up the
API in V2 to avoid file sources using side channels. But there's a big
difference between adding, for example, a way to report partitioning and
designing for sources that need unbounded state. It's a judgment call, but
I think unbounded state is definitely not something that we should design
around. Another way to think about it: yes, we want to design a better API
using existing sources as guides, but we don't need to assume that
everything those sources do should to be supported. It is reasonable to say
that this is a case we don't want to design for and the source needs to
change. Why can't we use a high watermark of files' modified timestamps?

For most sources, I think Spark should handle state serialization and
recovery. Maybe we can find a good way to make the file source with
unbounded state work, but this shouldn't be one of the driving cases for
the design and consequently a reason for every source to need to manage its
own state in a checkpoint directory.

rb

On Mon, Apr 30, 2018 at 12:37 PM, Joseph Torres <
joseph.torres@databricks.com> wrote:

> I'd argue that letting bad cases influence the design is an explicit goal
> of DataSourceV2. One of the primary motivations for the project was that
> file sources hook into a series of weird internal side channels, with
> favorable performance characteristics that are difficult to match in the
> API we actually declare to Spark users. So a design that we can't migrate
> file sources to without a side channel would be worrying; won't we end up
> regressing to the same situation?
>
> On Mon, Apr 30, 2018 at 11:59 AM, Ryan Blue <rb...@netflix.com> wrote:
>
>> Should we really plan the API for a source with state that grows
>> indefinitely? It sounds like we're letting a bad case influence the
>> design, when we probably shouldn't.
>>
>> On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres <
>> joseph.torres@databricks.com> wrote:
>>
>>> Offset is just a type alias for arbitrary JSON-serializable state. Most
>>> implementations should (and do) just toss the blob at Spark and let Spark
>>> handle recovery on its own.
>>>
>>> In the case of file streams, the obstacle is that the conceptual offset
>>> is very large: a list of every file which the stream has ever read. In
>>> order to parse this efficiently, the stream connector needs detailed
>>> control over how it's stored; the current implementation even has complex
>>> compactification and retention logic.
>>>
>>>
>>> On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> Why don't we just have the source return a Serializable of state when
>>>> it reports offsets? Then Spark could handle storing the source's state and
>>>> the source wouldn't need to worry about file system paths. I think that
>>>> would be easier for implementations and better for recovery because it
>>>> wouldn't leave unknown state on a single machine's file system.
>>>>
>>>> rb
>>>>
>>>> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
>>>> joseph.torres@databricks.com> wrote:
>>>>
>>>>> The precise interactions with the DataSourceV2 API haven't yet been
>>>>> hammered out in design. But much of this comes down to the core of
>>>>> Structured Streaming rather than the API details.
>>>>>
>>>>> The execution engine handles checkpointing and recovery. It asks the
>>>>> streaming data source for offsets, and then determines that batch N
>>>>> contains the data between offset A and offset B. On recovery, if batch N
>>>>> needs to be re-run, the execution engine just asks the source for the same
>>>>> offset range again. Sources also get a handle to their own subfolder of the
>>>>> checkpoint, which they can use as scratch space if they need. For example,
>>>>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>>>>> offsets can be simply indices into the log rather than huge strings
>>>>> containing all the paths.
>>>>>
>>>>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>>>>> ensuring that, within a single Spark job, two different tasks can't commit
>>>>> the same partition.
>>>>>
>>>>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>>>>> jthakrar@conversantmedia.com> wrote:
>>>>>
>>>>>> Wondering if this issue is related to SPARK-23323?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any pointers will be greatly appreciated….
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jayesh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
>>>>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>>>>> *To: *"dev@spark.apache.org" <de...@spark.apache.org>
>>>>>> *Subject: *Datasource API V2 and checkpointing
>>>>>>
>>>>>>
>>>>>>
>>>>>> I was wondering when checkpointing is enabled, who does the actual
>>>>>> work?
>>>>>>
>>>>>> The streaming datasource or the execution engine/driver?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have written a small/trivial datasource that just generates strings.
>>>>>>
>>>>>> After enabling checkpointing, I do see a folder being created under
>>>>>> the checkpoint folder, but there's nothing else in there.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Same question for write-ahead and recovery?
>>>>>>
>>>>>> And on a restart from a failed streaming session - who should set the
>>>>>> offsets?
>>>>>>
>>>>>> The driver/Spark or the datasource?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any pointers to design docs would also be greatly appreciated.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jayesh
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Datasource API V2 and checkpointing

Posted by Joseph Torres <jo...@databricks.com>.
I'd argue that letting bad cases influence the design is an explicit goal
of DataSourceV2. One of the primary motivations for the project was that
file sources hook into a series of weird internal side channels, with
favorable performance characteristics that are difficult to match in the
API we actually declare to Spark users. So a design that we can't migrate
file sources to without a side channel would be worrying; won't we end up
regressing to the same situation?

On Mon, Apr 30, 2018 at 11:59 AM, Ryan Blue <rb...@netflix.com> wrote:

> Should we really plan the API for a source with state that grows
> indefinitely? It sounds like we're letting a bad case influence the
> design, when we probably shouldn't.
>
> On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres <
> joseph.torres@databricks.com> wrote:
>
>> Offset is just a type alias for arbitrary JSON-serializable state. Most
>> implementations should (and do) just toss the blob at Spark and let Spark
>> handle recovery on its own.
>>
>> In the case of file streams, the obstacle is that the conceptual offset
>> is very large: a list of every file which the stream has ever read. In
>> order to parse this efficiently, the stream connector needs detailed
>> control over how it's stored; the current implementation even has complex
>> compactification and retention logic.
>>
>>
>> On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Why don't we just have the source return a Serializable of state when it
>>> reports offsets? Then Spark could handle storing the source's state and the
>>> source wouldn't need to worry about file system paths. I think that would
>>> be easier for implementations and better for recovery because it wouldn't
>>> leave unknown state on a single machine's file system.
>>>
>>> rb
>>>
>>> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
>>> joseph.torres@databricks.com> wrote:
>>>
>>>> The precise interactions with the DataSourceV2 API haven't yet been
>>>> hammered out in design. But much of this comes down to the core of
>>>> Structured Streaming rather than the API details.
>>>>
>>>> The execution engine handles checkpointing and recovery. It asks the
>>>> streaming data source for offsets, and then determines that batch N
>>>> contains the data between offset A and offset B. On recovery, if batch N
>>>> needs to be re-run, the execution engine just asks the source for the same
>>>> offset range again. Sources also get a handle to their own subfolder of the
>>>> checkpoint, which they can use as scratch space if they need. For example,
>>>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>>>> offsets can be simply indices into the log rather than huge strings
>>>> containing all the paths.
>>>>
>>>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>>>> ensuring that, within a single Spark job, two different tasks can't commit
>>>> the same partition.
>>>>
>>>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>>>> jthakrar@conversantmedia.com> wrote:
>>>>
>>>>> Wondering if this issue is related to SPARK-23323?
>>>>>
>>>>>
>>>>>
>>>>> Any pointers will be greatly appreciated….
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jayesh
>>>>>
>>>>>
>>>>>
>>>>> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
>>>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>>>> *To: *"dev@spark.apache.org" <de...@spark.apache.org>
>>>>> *Subject: *Datasource API V2 and checkpointing
>>>>>
>>>>>
>>>>>
>>>>> I was wondering when checkpointing is enabled, who does the actual
>>>>> work?
>>>>>
>>>>> The streaming datasource or the execution engine/driver?
>>>>>
>>>>>
>>>>>
>>>>> I have written a small/trivial datasource that just generates strings.
>>>>>
>>>>> After enabling checkpointing, I do see a folder being created under
>>>>> the checkpoint folder, but there's nothing else in there.
>>>>>
>>>>>
>>>>>
>>>>> Same question for write-ahead and recovery?
>>>>>
>>>>> And on a restart from a failed streaming session - who should set the
>>>>> offsets?
>>>>>
>>>>> The driver/Spark or the datasource?
>>>>>
>>>>>
>>>>>
>>>>> Any pointers to design docs would also be greatly appreciated.
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jayesh
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Datasource API V2 and checkpointing

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Should we really plan the API for a source with state that grows
indefinitely? It sounds like we're letting a bad case influence the design,
when we probably shouldn't.

On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres <
joseph.torres@databricks.com> wrote:

> Offset is just a type alias for arbitrary JSON-serializable state. Most
> implementations should (and do) just toss the blob at Spark and let Spark
> handle recovery on its own.
>
> In the case of file streams, the obstacle is that the conceptual offset is
> very large: a list of every file which the stream has ever read. In order
> to parse this efficiently, the stream connector needs detailed control over
> how it's stored; the current implementation even has complex
> compactification and retention logic.
>
>
> On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote:
>
>> Why don't we just have the source return a Serializable of state when it
>> reports offsets? Then Spark could handle storing the source's state and the
>> source wouldn't need to worry about file system paths. I think that would
>> be easier for implementations and better for recovery because it wouldn't
>> leave unknown state on a single machine's file system.
>>
>> rb
>>
>> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
>> joseph.torres@databricks.com> wrote:
>>
>>> The precise interactions with the DataSourceV2 API haven't yet been
>>> hammered out in design. But much of this comes down to the core of
>>> Structured Streaming rather than the API details.
>>>
>>> The execution engine handles checkpointing and recovery. It asks the
>>> streaming data source for offsets, and then determines that batch N
>>> contains the data between offset A and offset B. On recovery, if batch N
>>> needs to be re-run, the execution engine just asks the source for the same
>>> offset range again. Sources also get a handle to their own subfolder of the
>>> checkpoint, which they can use as scratch space if they need. For example,
>>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>>> offsets can be simply indices into the log rather than huge strings
>>> containing all the paths.
>>>
>>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>>> ensuring that, within a single Spark job, two different tasks can't commit
>>> the same partition.
>>>
>>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>>> jthakrar@conversantmedia.com> wrote:
>>>
>>>> Wondering if this issue is related to SPARK-23323?
>>>>
>>>>
>>>>
>>>> Any pointers will be greatly appreciated….
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Jayesh
>>>>
>>>>
>>>>
>>>> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
>>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>>> *To: *"dev@spark.apache.org" <de...@spark.apache.org>
>>>> *Subject: *Datasource API V2 and checkpointing
>>>>
>>>>
>>>>
>>>> I was wondering when checkpointing is enabled, who does the actual work?
>>>>
>>>> The streaming datasource or the execution engine/driver?
>>>>
>>>>
>>>>
>>>> I have written a small/trivial datasource that just generates strings.
>>>>
>>>> After enabling checkpointing, I do see a folder being created under the
>>>> checkpoint folder, but there's nothing else in there.
>>>>
>>>>
>>>>
>>>> Same question for write-ahead and recovery?
>>>>
>>>> And on a restart from a failed streaming session - who should set the
>>>> offsets?
>>>>
>>>> The driver/Spark or the datasource?
>>>>
>>>>
>>>>
>>>> Any pointers to design docs would also be greatly appreciated.
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Jayesh
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Datasource API V2 and checkpointing

Posted by Joseph Torres <jo...@databricks.com>.
Offset is just a type alias for arbitrary JSON-serializable state. Most
implementations should (and do) just toss the blob at Spark and let Spark
handle recovery on its own.

In the case of file streams, the obstacle is that the conceptual offset is
very large: a list of every file which the stream has ever read. In order
to parse this efficiently, the stream connector needs detailed control over
how it's stored; the current implementation even has complex
compactification and retention logic.


On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote:

> Why don't we just have the source return a Serializable of state when it
> reports offsets? Then Spark could handle storing the source's state and the
> source wouldn't need to worry about file system paths. I think that would
> be easier for implementations and better for recovery because it wouldn't
> leave unknown state on a single machine's file system.
>
> rb
>
> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
> joseph.torres@databricks.com> wrote:
>
>> The precise interactions with the DataSourceV2 API haven't yet been
>> hammered out in design. But much of this comes down to the core of
>> Structured Streaming rather than the API details.
>>
>> The execution engine handles checkpointing and recovery. It asks the
>> streaming data source for offsets, and then determines that batch N
>> contains the data between offset A and offset B. On recovery, if batch N
>> needs to be re-run, the execution engine just asks the source for the same
>> offset range again. Sources also get a handle to their own subfolder of the
>> checkpoint, which they can use as scratch space if they need. For example,
>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>> offsets can be simply indices into the log rather than huge strings
>> containing all the paths.
>>
>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>> ensuring that, within a single Spark job, two different tasks can't commit
>> the same partition.
>>
>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>> jthakrar@conversantmedia.com> wrote:
>>
>>> Wondering if this issue is related to SPARK-23323?
>>>
>>>
>>>
>>> Any pointers will be greatly appreciated….
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jayesh
>>>
>>>
>>>
>>> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>> *To: *"dev@spark.apache.org" <de...@spark.apache.org>
>>> *Subject: *Datasource API V2 and checkpointing
>>>
>>>
>>>
>>> I was wondering when checkpointing is enabled, who does the actual work?
>>>
>>> The streaming datasource or the execution engine/driver?
>>>
>>>
>>>
>>> I have written a small/trivial datasource that just generates strings.
>>>
>>> After enabling checkpointing, I do see a folder being created under the
>>> checkpoint folder, but there's nothing else in there.
>>>
>>>
>>>
>>> Same question for write-ahead and recovery?
>>>
>>> And on a restart from a failed streaming session - who should set the
>>> offsets?
>>>
>>> The driver/Spark or the datasource?
>>>
>>>
>>>
>>> Any pointers to design docs would also be greatly appreciated.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jayesh
>>>
>>>
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Datasource API V2 and checkpointing

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Why don't we just have the source return a Serializable of state when it
reports offsets? Then Spark could handle storing the source's state and the
source wouldn't need to worry about file system paths. I think that would
be easier for implementations and better for recovery because it wouldn't
leave unknown state on a single machine's file system.

rb

On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <joseph.torres@databricks.com
> wrote:

> The precise interactions with the DataSourceV2 API haven't yet been
> hammered out in design. But much of this comes down to the core of
> Structured Streaming rather than the API details.
>
> The execution engine handles checkpointing and recovery. It asks the
> streaming data source for offsets, and then determines that batch N
> contains the data between offset A and offset B. On recovery, if batch N
> needs to be re-run, the execution engine just asks the source for the same
> offset range again. Sources also get a handle to their own subfolder of the
> checkpoint, which they can use as scratch space if they need. For example,
> Spark's FileStreamReader keeps a log of all the files it's seen, so its
> offsets can be simply indices into the log rather than huge strings
> containing all the paths.
>
> SPARK-23323 is orthogonal. That commit coordinator is responsible for
> ensuring that, within a single Spark job, two different tasks can't commit
> the same partition.
>
> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
> jthakrar@conversantmedia.com> wrote:
>
>> Wondering if this issue is related to SPARK-23323?
>>
>>
>>
>> Any pointers will be greatly appreciated….
>>
>>
>>
>> Thanks,
>>
>> Jayesh
>>
>>
>>
>> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
>> *Date: *Monday, April 23, 2018 at 9:49 PM
>> *To: *"dev@spark.apache.org" <de...@spark.apache.org>
>> *Subject: *Datasource API V2 and checkpointing
>>
>>
>>
>> I was wondering when checkpointing is enabled, who does the actual work?
>>
>> The streaming datasource or the execution engine/driver?
>>
>>
>>
>> I have written a small/trivial datasource that just generates strings.
>>
>> After enabling checkpointing, I do see a folder being created under the
>> checkpoint folder, but there's nothing else in there.
>>
>>
>>
>> Same question for write-ahead and recovery?
>>
>> And on a restart from a failed streaming session - who should set the
>> offsets?
>>
>> The driver/Spark or the datasource?
>>
>>
>>
>> Any pointers to design docs would also be greatly appreciated.
>>
>>
>>
>> Thanks,
>>
>> Jayesh
>>
>>
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Datasource API V2 and checkpointing

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
Thanks Joseph!

From: Joseph Torres <jo...@databricks.com>
Date: Friday, April 27, 2018 at 11:23 AM
To: "Thakrar, Jayesh" <jt...@conversantmedia.com>
Cc: "dev@spark.apache.org" <de...@spark.apache.org>
Subject: Re: Datasource API V2 and checkpointing

The precise interactions with the DataSourceV2 API haven't yet been hammered out in design. But much of this comes down to the core of Structured Streaming rather than the API details.

The execution engine handles checkpointing and recovery. It asks the streaming data source for offsets, and then determines that batch N contains the data between offset A and offset B. On recovery, if batch N needs to be re-run, the execution engine just asks the source for the same offset range again. Sources also get a handle to their own subfolder of the checkpoint, which they can use as scratch space if they need. For example, Spark's FileStreamReader keeps a log of all the files it's seen, so its offsets can be simply indices into the log rather than huge strings containing all the paths.

SPARK-23323 is orthogonal. That commit coordinator is responsible for ensuring that, within a single Spark job, two different tasks can't commit the same partition.

On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <jt...@conversantmedia.com>> wrote:
Wondering if this issue is related to SPARK-23323?

Any pointers will be greatly appreciated….

Thanks,
Jayesh

From: "Thakrar, Jayesh" <jt...@conversantmedia.com>>
Date: Monday, April 23, 2018 at 9:49 PM
To: "dev@spark.apache.org<ma...@spark.apache.org>" <de...@spark.apache.org>>
Subject: Datasource API V2 and checkpointing

I was wondering when checkpointing is enabled, who does the actual work?
The streaming datasource or the execution engine/driver?

I have written a small/trivial datasource that just generates strings.
After enabling checkpointing, I do see a folder being created under the checkpoint folder, but there's nothing else in there.

Same question for write-ahead and recovery?
And on a restart from a failed streaming session - who should set the offsets?
The driver/Spark or the datasource?

Any pointers to design docs would also be greatly appreciated.

Thanks,
Jayesh



Re: Datasource API V2 and checkpointing

Posted by Joseph Torres <jo...@databricks.com>.
The precise interactions with the DataSourceV2 API haven't yet been
hammered out in design. But much of this comes down to the core of
Structured Streaming rather than the API details.

The execution engine handles checkpointing and recovery. It asks the
streaming data source for offsets, and then determines that batch N
contains the data between offset A and offset B. On recovery, if batch N
needs to be re-run, the execution engine just asks the source for the same
offset range again. Sources also get a handle to their own subfolder of the
checkpoint, which they can use as scratch space if they need. For example,
Spark's FileStreamReader keeps a log of all the files it's seen, so its
offsets can be simply indices into the log rather than huge strings
containing all the paths.

SPARK-23323 is orthogonal. That commit coordinator is responsible for
ensuring that, within a single Spark job, two different tasks can't commit
the same partition.

On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
jthakrar@conversantmedia.com> wrote:

> Wondering if this issue is related to SPARK-23323?
>
>
>
> Any pointers will be greatly appreciated….
>
>
>
> Thanks,
>
> Jayesh
>
>
>
> *From: *"Thakrar, Jayesh" <jt...@conversantmedia.com>
> *Date: *Monday, April 23, 2018 at 9:49 PM
> *To: *"dev@spark.apache.org" <de...@spark.apache.org>
> *Subject: *Datasource API V2 and checkpointing
>
>
>
> I was wondering when checkpointing is enabled, who does the actual work?
>
> The streaming datasource or the execution engine/driver?
>
>
>
> I have written a small/trivial datasource that just generates strings.
>
> After enabling checkpointing, I do see a folder being created under the
> checkpoint folder, but there's nothing else in there.
>
>
>
> Same question for write-ahead and recovery?
>
> And on a restart from a failed streaming session - who should set the
> offsets?
>
> The driver/Spark or the datasource?
>
>
>
> Any pointers to design docs would also be greatly appreciated.
>
>
>
> Thanks,
>
> Jayesh
>
>
>

Re: Datasource API V2 and checkpointing

Posted by "Thakrar, Jayesh" <jt...@conversantmedia.com>.
Wondering if this issue is related to SPARK-23323?

Any pointers will be greatly appreciated….

Thanks,
Jayesh

From: "Thakrar, Jayesh" <jt...@conversantmedia.com>
Date: Monday, April 23, 2018 at 9:49 PM
To: "dev@spark.apache.org" <de...@spark.apache.org>
Subject: Datasource API V2 and checkpointing

I was wondering when checkpointing is enabled, who does the actual work?
The streaming datasource or the execution engine/driver?

I have written a small/trivial datasource that just generates strings.
After enabling checkpointing, I do see a folder being created under the checkpoint folder, but there's nothing else in there.

Same question for write-ahead and recovery?
And on a restart from a failed streaming session - who should set the offsets?
The driver/Spark or the datasource?

Any pointers to design docs would also be greatly appreciated.

Thanks,
Jayesh