You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Patrick Woody <pa...@gmail.com> on 2018/04/01 16:32:32 UTC

Re: DataSourceV2 write input requirements

Yep, that sounds reasonable to me!

On Fri, Mar 30, 2018 at 5:50 PM, Ted Yu <yu...@gmail.com> wrote:

> +1
>
> -------- Original message --------
> From: Ryan Blue <rb...@netflix.com>
> Date: 3/30/18 2:28 PM (GMT-08:00)
> To: Patrick Woody <pa...@gmail.com>
> Cc: Russell Spitzer <ru...@gmail.com>, Wenchen Fan <
> cloud0fan@gmail.com>, Ted Yu <yu...@gmail.com>, Spark Dev List <
> dev@spark.apache.org>
> Subject: Re: DataSourceV2 write input requirements
>
> You're right. A global sort would change the clustering if it had more
> fields than the clustering.
>
> Then what about this: if there is no RequiredClustering, then the sort is
> a global sort. If RequiredClustering is present, then the clustering is
> applied and the sort is a partition-level sort.
>
> That rule would mean that within a partition you always get the sort, but
> an explicit clustering overrides the partitioning a sort might try to
> introduce. Does that sound reasonable?
>
> rb
>
> On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody <pa...@gmail.com>
> wrote:
>
>> Does that methodology work in this specific case? The ordering must be a
>> subset of the clustering to guarantee they exist in the same partition when
>> doing a global sort I thought. Though I get the gist that if it does
>> satisfy, then there is no reason to not choose the global sort.
>>
>> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> > Can you expand on how the ordering containing the clustering
>>> expressions would ensure the global sort?
>>>
>>> The idea was to basically assume that if the clustering can be satisfied
>>> by a global sort, then do the global sort. For example, if the clustering
>>> is Set("b", "a") and the sort is Seq("a", "b", "c") then do a global sort
>>> by columns a, b, and c.
>>>
>>> Technically, you could do this with a hash partitioner instead of a
>>> range partitioner and sort within each partition, but that doesn't make
>>> much sense because the partitioning would ensure that each partition has
>>> just one combination of the required clustering columns. Using a hash
>>> partitioner would make it so that the in-partition sort basically ignores
>>> the first few values, so it must be that the intent was a global sort.
>>>
>>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody <patrick.woody1@gmail.com
>>> > wrote:
>>>
>>>> Right, you could use this to store a global ordering if there is only
>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that
>>>>> case, you would still have a clustering and an ordering, but the ordering
>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>> partition ordinal for the source to store would be required.
>>>>
>>>>
>>>> Can you expand on how the ordering containing the clustering
>>>> expressions would ensure the global sort? Having an RangePartitioning would
>>>> certainly satisfy, but it isn't required - is the suggestion that if Spark
>>>> sees this overlap, then it plans a global sort?
>>>>
>>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
>>>> russell.spitzer@gmail.com> wrote:
>>>>
>>>>> @RyanBlue I'm hoping that through the CBO effort we will continue to
>>>>> get more detailed statistics. Like on read we could be using sketch data
>>>>> structures to get estimates on unique values and density for each column.
>>>>> You may be right that the real way for this to be handled would be giving a
>>>>> "cost" back to a higher order optimizer which can decide which method to
>>>>> use rather than having the data source itself do it. This is probably in a
>>>>> far future version of the api.
>>>>>
>>>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> Cassandra can insert records with the same partition-key faster if
>>>>>> they arrive in the same payload. But this is only beneficial if the
>>>>>> incoming dataset has multiple entries for the same partition key.
>>>>>>
>>>>>> Thanks for the example, the recommended partitioning use case makes
>>>>>> more sense now. I think we could have two interfaces, a
>>>>>> RequiresClustering and a RecommendsClustering if we want to support
>>>>>> this. But I’m skeptical it will be useful for two reasons:
>>>>>>
>>>>>>    - Do we want to optimize the low cardinality case? Shuffles are
>>>>>>    usually much cheaper at smaller sizes, so I’m not sure it is necessary to
>>>>>>    optimize this away.
>>>>>>    - How do we know there isn’t just a few partition keys for all
>>>>>>    the records? It may look like a shuffle wouldn’t help, but we don’t know
>>>>>>    the partition keys until it is too late.
>>>>>>
>>>>>> Then there’s also the logic for avoiding the shuffle and how to
>>>>>> calculate the cost, which sounds like something that needs some details
>>>>>> from CBO.
>>>>>>
>>>>>> I would assume that given the estimated data size from Spark and
>>>>>> options passed in from the user, the data source could make a more
>>>>>> intelligent requirement on the write format than Spark independently.
>>>>>>
>>>>>> This is a good point.
>>>>>>
>>>>>> What would an implementation actually do here and how would
>>>>>> information be passed? For my use cases, the store would produce the number
>>>>>> of tasks based on the estimated incoming rows, because the source has the
>>>>>> best idea of how the rows will compress. But, that’s just applying a
>>>>>> multiplier most of the time. To be very useful, this would have to handle
>>>>>> skew in the rows (think row with a type where total size depends on type)
>>>>>> and that’s a bit harder. I think maybe an interface that can provide
>>>>>> relative cost estimates based on partition keys would be helpful, but then
>>>>>> keep the planning logic in Spark.
>>>>>>
>>>>>> This is probably something that we could add later as we find use
>>>>>> cases that require it?
>>>>>>
>>>>>> I wouldn’t assume that a data source requiring a certain write format
>>>>>> would give any guarantees around reading the same data? In the cases where
>>>>>> it is a complete overwrite it would, but for independent writes it could
>>>>>> still be useful for statistics or compression.
>>>>>>
>>>>>> Right, you could use this to store a global ordering if there is only
>>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that
>>>>>> case, you would still have a clustering and an ordering, but the ordering
>>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>>> partition ordinal for the source to store would be required.
>>>>>>
>>>>>> For the second point that ordering is useful for statistics and
>>>>>> compression, I completely agree. Our best practices doc tells users to
>>>>>> always add a global sort when writing because you get the benefit of a
>>>>>> range partitioner to handle skew, plus the stats and compression you’re
>>>>>> talking about to optimize for reads. I think the proposed API can request a
>>>>>> global ordering from Spark already. My only point is that there isn’t much
>>>>>> the source can do to guarantee ordering for reads when there is more than
>>>>>> one write.
>>>>>> ​
>>>>>>
>>>>>> On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody <
>>>>>> patrick.woody1@gmail.com> wrote:
>>>>>>
>>>>>>> Spark would always apply the required clustering and sort order
>>>>>>>> because they are required by the data source. It is reasonable for a source
>>>>>>>> to reject data that isn’t properly prepared. For example, data must be
>>>>>>>> written to HTable files with keys in order or else the files are invalid.
>>>>>>>> Sorting should not be implemented in the sources themselves because Spark
>>>>>>>> handles concerns like spilling to disk. Spark must prepare data correctly,
>>>>>>>> which is why the interfaces start with “Requires”.
>>>>>>>
>>>>>>>
>>>>>>> This was in reference to Russell's suggestion that the data source
>>>>>>> could have a required sort, but only a recommended partitioning. I don't
>>>>>>> have an immediate recommending use case that would come to mind though. I'm
>>>>>>> definitely in sync that the data source itself shouldn't do work outside of
>>>>>>> the writes themselves.
>>>>>>>
>>>>>>> Considering the second use case you mentioned first, I don’t think
>>>>>>>> it is a good idea for a table to put requirements on the number of tasks
>>>>>>>> used for a write. The parallelism should be set appropriately for the data
>>>>>>>> volume, which is for Spark or the user to determine. A minimum or maximum
>>>>>>>> number of tasks could cause bad behavior.
>>>>>>>
>>>>>>>
>>>>>>> For your first use case, an explicit global ordering, the problem is
>>>>>>>> that there can’t be an explicit global ordering for a table when it is
>>>>>>>> populated by a series of independent writes. Each write could have a global
>>>>>>>> order, but once those files are written, you have to deal with multiple
>>>>>>>> sorted data sets. I think it makes sense to focus on order within data
>>>>>>>> files, not order between data files.
>>>>>>>
>>>>>>>
>>>>>>> This is where I'm interested in learning about the separation of
>>>>>>> responsibilities for the data source and how "smart" it is supposed to be.
>>>>>>>
>>>>>>> For the first part, I would assume that given the estimated data
>>>>>>> size from Spark and options passed in from the user, the data source could
>>>>>>> make a more intelligent requirement on the write format than Spark
>>>>>>> independently. Somewhat analogous to how the current FileSource does bin
>>>>>>> packing of small files on the read side, restricting parallelism for the
>>>>>>> sake of overhead.
>>>>>>>
>>>>>>> For the second, I wouldn't assume that a data source requiring a
>>>>>>> certain write format would give any guarantees around reading the same
>>>>>>> data? In the cases where it is a complete overwrite it would, but for
>>>>>>> independent writes it could still be useful for statistics or compression.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Pat
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>

Re: DataSourceV2 write input requirements

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Since it sounds like there is consensus here, I've opened an issue for
this: https://issues.apache.org/jira/browse/SPARK-23889

On Sun, Apr 1, 2018 at 9:32 AM, Patrick Woody <pa...@gmail.com>
wrote:

> Yep, that sounds reasonable to me!
>
> On Fri, Mar 30, 2018 at 5:50 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> +1
>>
>> -------- Original message --------
>> From: Ryan Blue <rb...@netflix.com>
>> Date: 3/30/18 2:28 PM (GMT-08:00)
>> To: Patrick Woody <pa...@gmail.com>
>> Cc: Russell Spitzer <ru...@gmail.com>, Wenchen Fan <
>> cloud0fan@gmail.com>, Ted Yu <yu...@gmail.com>, Spark Dev List <
>> dev@spark.apache.org>
>> Subject: Re: DataSourceV2 write input requirements
>>
>> You're right. A global sort would change the clustering if it had more
>> fields than the clustering.
>>
>> Then what about this: if there is no RequiredClustering, then the sort is
>> a global sort. If RequiredClustering is present, then the clustering is
>> applied and the sort is a partition-level sort.
>>
>> That rule would mean that within a partition you always get the sort, but
>> an explicit clustering overrides the partitioning a sort might try to
>> introduce. Does that sound reasonable?
>>
>> rb
>>
>> On Fri, Mar 30, 2018 at 12:39 PM, Patrick Woody <patrick.woody1@gmail.com
>> > wrote:
>>
>>> Does that methodology work in this specific case? The ordering must be a
>>> subset of the clustering to guarantee they exist in the same partition when
>>> doing a global sort I thought. Though I get the gist that if it does
>>> satisfy, then there is no reason to not choose the global sort.
>>>
>>> On Fri, Mar 30, 2018 at 1:31 PM, Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> > Can you expand on how the ordering containing the clustering
>>>> expressions would ensure the global sort?
>>>>
>>>> The idea was to basically assume that if the clustering can be
>>>> satisfied by a global sort, then do the global sort. For example, if the
>>>> clustering is Set("b", "a") and the sort is Seq("a", "b", "c") then do a
>>>> global sort by columns a, b, and c.
>>>>
>>>> Technically, you could do this with a hash partitioner instead of a
>>>> range partitioner and sort within each partition, but that doesn't make
>>>> much sense because the partitioning would ensure that each partition has
>>>> just one combination of the required clustering columns. Using a hash
>>>> partitioner would make it so that the in-partition sort basically ignores
>>>> the first few values, so it must be that the intent was a global sort.
>>>>
>>>> On Fri, Mar 30, 2018 at 6:51 AM, Patrick Woody <
>>>> patrick.woody1@gmail.com> wrote:
>>>>
>>>>> Right, you could use this to store a global ordering if there is only
>>>>>> one write (e.g., CTAS). I don’t think anything needs to change in that
>>>>>> case, you would still have a clustering and an ordering, but the ordering
>>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>>> partition ordinal for the source to store would be required.
>>>>>
>>>>>
>>>>> Can you expand on how the ordering containing the clustering
>>>>> expressions would ensure the global sort? Having an RangePartitioning would
>>>>> certainly satisfy, but it isn't required - is the suggestion that if Spark
>>>>> sees this overlap, then it plans a global sort?
>>>>>
>>>>> On Thu, Mar 29, 2018 at 12:16 PM, Russell Spitzer <
>>>>> russell.spitzer@gmail.com> wrote:
>>>>>
>>>>>> @RyanBlue I'm hoping that through the CBO effort we will continue to
>>>>>> get more detailed statistics. Like on read we could be using sketch data
>>>>>> structures to get estimates on unique values and density for each column.
>>>>>> You may be right that the real way for this to be handled would be giving a
>>>>>> "cost" back to a higher order optimizer which can decide which method to
>>>>>> use rather than having the data source itself do it. This is probably in a
>>>>>> far future version of the api.
>>>>>>
>>>>>> On Thu, Mar 29, 2018 at 9:10 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> Cassandra can insert records with the same partition-key faster if
>>>>>>> they arrive in the same payload. But this is only beneficial if the
>>>>>>> incoming dataset has multiple entries for the same partition key.
>>>>>>>
>>>>>>> Thanks for the example, the recommended partitioning use case makes
>>>>>>> more sense now. I think we could have two interfaces, a
>>>>>>> RequiresClustering and a RecommendsClustering if we want to support
>>>>>>> this. But I’m skeptical it will be useful for two reasons:
>>>>>>>
>>>>>>>    - Do we want to optimize the low cardinality case? Shuffles are
>>>>>>>    usually much cheaper at smaller sizes, so I’m not sure it is necessary to
>>>>>>>    optimize this away.
>>>>>>>    - How do we know there isn’t just a few partition keys for all
>>>>>>>    the records? It may look like a shuffle wouldn’t help, but we don’t know
>>>>>>>    the partition keys until it is too late.
>>>>>>>
>>>>>>> Then there’s also the logic for avoiding the shuffle and how to
>>>>>>> calculate the cost, which sounds like something that needs some details
>>>>>>> from CBO.
>>>>>>>
>>>>>>> I would assume that given the estimated data size from Spark and
>>>>>>> options passed in from the user, the data source could make a more
>>>>>>> intelligent requirement on the write format than Spark independently.
>>>>>>>
>>>>>>> This is a good point.
>>>>>>>
>>>>>>> What would an implementation actually do here and how would
>>>>>>> information be passed? For my use cases, the store would produce the number
>>>>>>> of tasks based on the estimated incoming rows, because the source has the
>>>>>>> best idea of how the rows will compress. But, that’s just applying a
>>>>>>> multiplier most of the time. To be very useful, this would have to handle
>>>>>>> skew in the rows (think row with a type where total size depends on type)
>>>>>>> and that’s a bit harder. I think maybe an interface that can provide
>>>>>>> relative cost estimates based on partition keys would be helpful, but then
>>>>>>> keep the planning logic in Spark.
>>>>>>>
>>>>>>> This is probably something that we could add later as we find use
>>>>>>> cases that require it?
>>>>>>>
>>>>>>> I wouldn’t assume that a data source requiring a certain write
>>>>>>> format would give any guarantees around reading the same data? In the cases
>>>>>>> where it is a complete overwrite it would, but for independent writes it
>>>>>>> could still be useful for statistics or compression.
>>>>>>>
>>>>>>> Right, you could use this to store a global ordering if there is
>>>>>>> only one write (e.g., CTAS). I don’t think anything needs to change in that
>>>>>>> case, you would still have a clustering and an ordering, but the ordering
>>>>>>> would need to include all fields of the clustering. A way to pass in the
>>>>>>> partition ordinal for the source to store would be required.
>>>>>>>
>>>>>>> For the second point that ordering is useful for statistics and
>>>>>>> compression, I completely agree. Our best practices doc tells users to
>>>>>>> always add a global sort when writing because you get the benefit of a
>>>>>>> range partitioner to handle skew, plus the stats and compression you’re
>>>>>>> talking about to optimize for reads. I think the proposed API can request a
>>>>>>> global ordering from Spark already. My only point is that there isn’t much
>>>>>>> the source can do to guarantee ordering for reads when there is more than
>>>>>>> one write.
>>>>>>> ​
>>>>>>>
>>>>>>> On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody <
>>>>>>> patrick.woody1@gmail.com> wrote:
>>>>>>>
>>>>>>>> Spark would always apply the required clustering and sort order
>>>>>>>>> because they are required by the data source. It is reasonable for a source
>>>>>>>>> to reject data that isn’t properly prepared. For example, data must be
>>>>>>>>> written to HTable files with keys in order or else the files are invalid.
>>>>>>>>> Sorting should not be implemented in the sources themselves because Spark
>>>>>>>>> handles concerns like spilling to disk. Spark must prepare data correctly,
>>>>>>>>> which is why the interfaces start with “Requires”.
>>>>>>>>
>>>>>>>>
>>>>>>>> This was in reference to Russell's suggestion that the data source
>>>>>>>> could have a required sort, but only a recommended partitioning. I don't
>>>>>>>> have an immediate recommending use case that would come to mind though. I'm
>>>>>>>> definitely in sync that the data source itself shouldn't do work outside of
>>>>>>>> the writes themselves.
>>>>>>>>
>>>>>>>> Considering the second use case you mentioned first, I don’t think
>>>>>>>>> it is a good idea for a table to put requirements on the number of tasks
>>>>>>>>> used for a write. The parallelism should be set appropriately for the data
>>>>>>>>> volume, which is for Spark or the user to determine. A minimum or maximum
>>>>>>>>> number of tasks could cause bad behavior.
>>>>>>>>
>>>>>>>>
>>>>>>>> For your first use case, an explicit global ordering, the problem
>>>>>>>>> is that there can’t be an explicit global ordering for a table when it is
>>>>>>>>> populated by a series of independent writes. Each write could have a global
>>>>>>>>> order, but once those files are written, you have to deal with multiple
>>>>>>>>> sorted data sets. I think it makes sense to focus on order within data
>>>>>>>>> files, not order between data files.
>>>>>>>>
>>>>>>>>
>>>>>>>> This is where I'm interested in learning about the separation of
>>>>>>>> responsibilities for the data source and how "smart" it is supposed to be.
>>>>>>>>
>>>>>>>> For the first part, I would assume that given the estimated data
>>>>>>>> size from Spark and options passed in from the user, the data source could
>>>>>>>> make a more intelligent requirement on the write format than Spark
>>>>>>>> independently. Somewhat analogous to how the current FileSource does bin
>>>>>>>> packing of small files on the read side, restricting parallelism for the
>>>>>>>> sake of overhead.
>>>>>>>>
>>>>>>>> For the second, I wouldn't assume that a data source requiring a
>>>>>>>> certain write format would give any guarantees around reading the same
>>>>>>>> data? In the cases where it is a complete overwrite it would, but for
>>>>>>>> independent writes it could still be useful for statistics or compression.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Pat
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>


-- 
Ryan Blue
Software Engineer
Netflix