You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by David Morávek <da...@gmail.com> on 2018/10/17 21:07:50 UTC

[PROPOSAL] Move sorting to sdks-java-core

Hello,

I want to summarize my thoughts on the per key value sorting.

Currently we have a separate module for sorting extension. The extension
contains *SortValues* transformation and implementations of different
sorters.

Performance-wise it would be great to be able* to delegate sorting to a
runner* if it supports sort based shuffle. In order to do so, we should *move
SortValues transformation to sdks-java-core*, so a runner can easily
provide its own implementation.

The robust implementation is needed mainly for building of HFiles for the
HBase bulk load. When using external sorter, we often sort the whole data
set twice (shuffle may already did a job).

SortValues can not use custom comparator, because we want to be able to
push sorting logic down to a byte based shuffle.

The usage of SortValues transformation is little bit confusing. I think we
should add a *SortValues.perKey* method, which accepts a secondary key
extractor and coder, as the usage would be easier to understand. Also, this
explicitly states, that we sort values *perKey* only and that we sort using
an *encoded secondary key*. Example usage:


*PCollection<KV<String, Long>> input = ...;*
*input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*

What do you think? Is this the right direction?

Thanks for the comments!

Links:
-
http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by Robert Bradshaw <ro...@google.com>.
+1 to splitting out the Hadoop deps.

As has been said, there's no need to move it to core for runners to
optimize this. But perhaps a case could be made that this belongs in core?
(On the other hand, recent discussions indicate a desire to make core even
smaller.)

Also,  +1 to re-thinking the API. The current API takes elements of the form

    KV<PrimaryK, Iterable<KV<SecondaryK, SecondaryValue>>>

and the proposed API (from what I understand) would instead take elements
of the form

   KV<PrimaryK, Iterable<Value>>

and extract the secondary key from Value for sorting. This may be less
efficient in the case that SecondaryValue above is empty or trivial,
specifically, a runner may have to shuffle values of the form
serialized(key)+serialized(extractSecondaryKey(value))+serialized(value)
where serialized(value) may be quite redundant with
serialized(extractSecondaryKey(value)). I'm not sure this difference merits
the more complex API (especially as the only API).

I would also like to isolate the user from dealing directly with coders,
and having to know which ones are order-preserving in their encoding.
(Technically, BigEndianLongCoder as used above is problematic as it sorts
negative values after positive ones...an easy trap to fall into.) Perhaps
we would allow a function V -> KS where KS is a type where we know of a
(registered) order-preserving coder. Order-preserving KV and Tuple coders
could be built out of these; it's a property like key-deterministic. (We'd
need a "LengthPrefixCoder" that preserves order as well?) This notion could
be useful for optimizations of operations like Top and Quantiles as well.


On Thu, Oct 18, 2018 at 11:19 AM David Morávek <da...@gmail.com>
wrote:

> Kenn, I believe we should not introduce hadoop dependency to neither sdks
> or runners. We may split sorting in two packages, one with the
> transformation + in memory implementation (this is the part I'd love to see
> become part of sdks-java-core) and second module with more robust external
> sorter (with hadoop dep).
>
> Does this make sense?
>
>
> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <dh...@apache.org> wrote:
>
>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> The runner can always just depend on the sorter to do it the legacy way
>>> by class matching; it shouldn't incur other dependency penalties... but now
>>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
>>> price to pay for a user in any event. Are those Hadoop deps reasonably
>>> self-contained?
>>>
>>
>> Nice catch, Kenn! This is indeed why we didn't originally include the
>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>> time.
>>
>> Dan
>>
>>
>>>
>>> Kenn
>>>
>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>> executed via portability since the Runner will be able to perform
>>>> PTransform replacement and optimization based upon the URN of the transform
>>>> and its payload so it would never need to have the "Sorter" class in its
>>>> classpath.
>>>>
>>>> I'm ambivalent about whether merging it now is worth it.
>>>>
>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> We can always fall back to the External sorter in case of merging
>>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>>> not be an issue.
>>>>>
>>>>> In case of non-merging windows, runner implementation would probably
>>>>> require to group elements also by window during shuffle.
>>>>>
>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> One concern would be merging windows. This happens after shuffle, so
>>>>>> even if the shuffle were sorted you would need to do a sorted merge of two
>>>>>> sorted buffers.
>>>>>>
>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>> david.moravek@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>
>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>> extension contains *SortValues* transformation and implementations
>>>>>>> of different sorters.
>>>>>>>
>>>>>>> Performance-wise it would be great to be able* to delegate sorting
>>>>>>> to a runner* if it supports sort based shuffle. In order to do so,
>>>>>>> we should *move SortValues transformation to sdks-java-core*, so a
>>>>>>> runner can easily provide its own implementation.
>>>>>>>
>>>>>>> The robust implementation is needed mainly for building of HFiles
>>>>>>> for the HBase bulk load. When using external sorter, we often sort the
>>>>>>> whole data set twice (shuffle may already did a job).
>>>>>>>
>>>>>>> SortValues can not use custom comparator, because we want to be able
>>>>>>> to push sorting logic down to a byte based shuffle.
>>>>>>>
>>>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>>>> think we should add a *SortValues.perKey* method, which accepts a
>>>>>>> secondary key extractor and coder, as the usage would be easier to
>>>>>>> understand. Also, this explicitly states, that we sort values
>>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>>> Example usage:
>>>>>>>
>>>>>>>
>>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>>> BigEndianLongCoder.of()))*
>>>>>>>
>>>>>>> What do you think? Is this the right direction?
>>>>>>>
>>>>>>> Thanks for the comments!
>>>>>>>
>>>>>>> Links:
>>>>>>> -
>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>>
>>>>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by Robert Bradshaw <ro...@google.com>.
I like the idea of asking for a coder for T with properties X. (E.g. the
order-preserving one may not be the the most efficient, so a poor default,
but required in some cases.)

Note that if we go the route of secondary-key-extraction, we don't even
need a full coder here, just an order-preserving encoding. (This has, as
mentioned, the disadvantage of shuffling possible redundancy between the
order-providing key and the actual value).

On Mon, Oct 22, 2018 at 9:46 PM Kenneth Knowles <ke...@apache.org> wrote:

> A related approach to Robert's that does not involve new types is to alter
> coder inference from the current:
>
> 1. Ask for a coder for type T
> 2. Check that the coder is (order preserving / deterministic)
>
> To:
>
> 1. Ask for an order preserving coder for T / ask for a deterministic coder
> for T
>
> This would allow recursive search for a list or KV coder that is order
> preserving. This could be implemented as a parallel code path in
> CoderRegistry without other changes, and invoked by transforms, even before
> any global changes to how coders are inferred. We'd have to be careful
> about pipeline upgrade compatibility.
>
> Kenn
>
> On Mon, Oct 22, 2018 at 12:40 PM David Morávek <da...@gmail.com>
> wrote:
>
>> Lukasz, you are right. I didn't think about structured coders. Thanks
>>
>> On Mon, Oct 22, 2018 at 7:40 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I don't believe an interface will work because KvCoder/ListCoder/...
>>> would only be order preserving if their components coders were order
>>> preserving.
>>>
>>> On Mon, Oct 22, 2018 at 8:52 AM David Morávek <da...@gmail.com>
>>> wrote:
>>>
>>>> What should be the next step? I guess we all agree that hadoop
>>>> dependency should be splitted out. Then we're left off with the SortValues
>>>> transform + in memory implementation. I'm ok with keeping this as a
>>>> separate module, as this would discourage users to use sorting in their
>>>> business logic.
>>>>
>>>> Robert:
>>>> ad introduction of a new method for the coders. How about creating a
>>>> new interface eg. *OrderPreservingCoder*? Than you can require this
>>>> interface in your method signature and IDE will autocomplete all of the
>>>> possible implementations that you can use. In case of a new method, user
>>>> needs to now which implementations are order preserving and it can be
>>>> really confusing. I think the same thinking should apply to other coder
>>>> properties.
>>>>
>>>> D.
>>>>
>>>>
>>>>
>>>> On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick <ni...@google.com>
>>>> wrote:
>>>>
>>>>> FYI: the BufferedExternalSorter depends on Hadoop client libraries
>>>>> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on
>>>>> the Hadoop service -- because the  ExternalSorter
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java>
>>>>> uses Hadoop's SequenceFile
>>>>> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html> for
>>>>> on-disk sorting.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 18 Oct 2018 at 11:19 David Morávek <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Kenn, I believe we should not introduce hadoop dependency to neither
>>>>>> sdks or runners. We may split sorting in two packages, one with the
>>>>>> transformation + in memory implementation (this is the part I'd love to see
>>>>>> become part of sdks-java-core) and second module with more robust external
>>>>>> sorter (with hadoop dep).
>>>>>>
>>>>>> Does this make sense?
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <dh...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <ke...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> The runner can always just depend on the sorter to do it the legacy
>>>>>>>> way by class matching; it shouldn't incur other dependency penalties... but
>>>>>>>> now that I look briefly, the sorter depends on Hadoop bits. That seems a
>>>>>>>> heavy price to pay for a user in any event. Are those Hadoop deps
>>>>>>>> reasonably self-contained?
>>>>>>>>
>>>>>>>
>>>>>>> Nice catch, Kenn! This is indeed why we didn't originally include
>>>>>>> the Sorter in core. The Hadoop deps have an enormous surface, or did at the
>>>>>>> time.
>>>>>>>
>>>>>>> Dan
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>>>>>>> executed via portability since the Runner will be able to perform
>>>>>>>>> PTransform replacement and optimization based upon the URN of the transform
>>>>>>>>> and its payload so it would never need to have the "Sorter" class in its
>>>>>>>>> classpath.
>>>>>>>>>
>>>>>>>>> I'm ambivalent about whether merging it now is worth it.
>>>>>>>>>
>>>>>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <
>>>>>>>>> david.moravek@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> We can always fall back to the External sorter in case of merging
>>>>>>>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>>>>>>>> not be an issue.
>>>>>>>>>>
>>>>>>>>>> In case of non-merging windows, runner implementation would
>>>>>>>>>> probably require to group elements also by window during shuffle.
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> One concern would be merging windows. This happens after
>>>>>>>>>>> shuffle, so even if the shuffle were sorted you would need to do a sorted
>>>>>>>>>>> merge of two sorted buffers.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>>>>>>> david.moravek@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>>>>>>
>>>>>>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>>>>>>> extension contains *SortValues* transformation and
>>>>>>>>>>>> implementations of different sorters.
>>>>>>>>>>>>
>>>>>>>>>>>> Performance-wise it would be great to be able* to delegate
>>>>>>>>>>>> sorting to a runner* if it supports sort based shuffle. In
>>>>>>>>>>>> order to do so, we should *move SortValues transformation to
>>>>>>>>>>>> sdks-java-core*, so a runner can easily provide its own
>>>>>>>>>>>> implementation.
>>>>>>>>>>>>
>>>>>>>>>>>> The robust implementation is needed mainly for building of
>>>>>>>>>>>> HFiles for the HBase bulk load. When using external sorter, we often sort
>>>>>>>>>>>> the whole data set twice (shuffle may already did a job).
>>>>>>>>>>>>
>>>>>>>>>>>> SortValues can not use custom comparator, because we want to be
>>>>>>>>>>>> able to push sorting logic down to a byte based shuffle.
>>>>>>>>>>>>
>>>>>>>>>>>> The usage of SortValues transformation is little bit confusing.
>>>>>>>>>>>> I think we should add a *SortValues.perKey* method, which
>>>>>>>>>>>> accepts a secondary key extractor and coder, as the usage would be easier
>>>>>>>>>>>> to understand. Also, this explicitly states, that we sort values
>>>>>>>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>>>>>>>> Example usage:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>>>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>>>>>>>> BigEndianLongCoder.of()))*
>>>>>>>>>>>>
>>>>>>>>>>>> What do you think? Is this the right direction?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>>>
>>>>>>>>>>>> Links:
>>>>>>>>>>>> -
>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>>>>>>>
>>>>>>>>>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by Kenneth Knowles <ke...@apache.org>.
A related approach to Robert's that does not involve new types is to alter
coder inference from the current:

1. Ask for a coder for type T
2. Check that the coder is (order preserving / deterministic)

To:

1. Ask for an order preserving coder for T / ask for a deterministic coder
for T

This would allow recursive search for a list or KV coder that is order
preserving. This could be implemented as a parallel code path in
CoderRegistry without other changes, and invoked by transforms, even before
any global changes to how coders are inferred. We'd have to be careful
about pipeline upgrade compatibility.

Kenn

On Mon, Oct 22, 2018 at 12:40 PM David Morávek <da...@gmail.com>
wrote:

> Lukasz, you are right. I didn't think about structured coders. Thanks
>
> On Mon, Oct 22, 2018 at 7:40 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> I don't believe an interface will work because KvCoder/ListCoder/...
>> would only be order preserving if their components coders were order
>> preserving.
>>
>> On Mon, Oct 22, 2018 at 8:52 AM David Morávek <da...@gmail.com>
>> wrote:
>>
>>> What should be the next step? I guess we all agree that hadoop
>>> dependency should be splitted out. Then we're left off with the SortValues
>>> transform + in memory implementation. I'm ok with keeping this as a
>>> separate module, as this would discourage users to use sorting in their
>>> business logic.
>>>
>>> Robert:
>>> ad introduction of a new method for the coders. How about creating a new
>>> interface eg. *OrderPreservingCoder*? Than you can require this
>>> interface in your method signature and IDE will autocomplete all of the
>>> possible implementations that you can use. In case of a new method, user
>>> needs to now which implementations are order preserving and it can be
>>> really confusing. I think the same thinking should apply to other coder
>>> properties.
>>>
>>> D.
>>>
>>>
>>>
>>> On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick <ni...@google.com> wrote:
>>>
>>>> FYI: the BufferedExternalSorter depends on Hadoop client libraries
>>>> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on
>>>> the Hadoop service -- because the  ExternalSorter
>>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java>
>>>> uses Hadoop's SequenceFile
>>>> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html> for
>>>> on-disk sorting.
>>>>
>>>>
>>>>
>>>> On Thu, 18 Oct 2018 at 11:19 David Morávek <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> Kenn, I believe we should not introduce hadoop dependency to neither
>>>>> sdks or runners. We may split sorting in two packages, one with the
>>>>> transformation + in memory implementation (this is the part I'd love to see
>>>>> become part of sdks-java-core) and second module with more robust external
>>>>> sorter (with hadoop dep).
>>>>>
>>>>> Does this make sense?
>>>>>
>>>>>
>>>>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <dh...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <ke...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> The runner can always just depend on the sorter to do it the legacy
>>>>>>> way by class matching; it shouldn't incur other dependency penalties... but
>>>>>>> now that I look briefly, the sorter depends on Hadoop bits. That seems a
>>>>>>> heavy price to pay for a user in any event. Are those Hadoop deps
>>>>>>> reasonably self-contained?
>>>>>>>
>>>>>>
>>>>>> Nice catch, Kenn! This is indeed why we didn't originally include the
>>>>>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>>>>>> time.
>>>>>>
>>>>>> Dan
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>>>>>> executed via portability since the Runner will be able to perform
>>>>>>>> PTransform replacement and optimization based upon the URN of the transform
>>>>>>>> and its payload so it would never need to have the "Sorter" class in its
>>>>>>>> classpath.
>>>>>>>>
>>>>>>>> I'm ambivalent about whether merging it now is worth it.
>>>>>>>>
>>>>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <
>>>>>>>> david.moravek@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> We can always fall back to the External sorter in case of merging
>>>>>>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>>>>>>> not be an issue.
>>>>>>>>>
>>>>>>>>> In case of non-merging windows, runner implementation would
>>>>>>>>> probably require to group elements also by window during shuffle.
>>>>>>>>>
>>>>>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> One concern would be merging windows. This happens after shuffle,
>>>>>>>>>> so even if the shuffle were sorted you would need to do a sorted merge of
>>>>>>>>>> two sorted buffers.
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>>>>>> david.moravek@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>>>>>
>>>>>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>>>>>> extension contains *SortValues* transformation and
>>>>>>>>>>> implementations of different sorters.
>>>>>>>>>>>
>>>>>>>>>>> Performance-wise it would be great to be able* to delegate
>>>>>>>>>>> sorting to a runner* if it supports sort based shuffle. In
>>>>>>>>>>> order to do so, we should *move SortValues transformation to
>>>>>>>>>>> sdks-java-core*, so a runner can easily provide its own
>>>>>>>>>>> implementation.
>>>>>>>>>>>
>>>>>>>>>>> The robust implementation is needed mainly for building of
>>>>>>>>>>> HFiles for the HBase bulk load. When using external sorter, we often sort
>>>>>>>>>>> the whole data set twice (shuffle may already did a job).
>>>>>>>>>>>
>>>>>>>>>>> SortValues can not use custom comparator, because we want to be
>>>>>>>>>>> able to push sorting logic down to a byte based shuffle.
>>>>>>>>>>>
>>>>>>>>>>> The usage of SortValues transformation is little bit confusing.
>>>>>>>>>>> I think we should add a *SortValues.perKey* method, which
>>>>>>>>>>> accepts a secondary key extractor and coder, as the usage would be easier
>>>>>>>>>>> to understand. Also, this explicitly states, that we sort values
>>>>>>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>>>>>>> Example usage:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>>>>>>> BigEndianLongCoder.of()))*
>>>>>>>>>>>
>>>>>>>>>>> What do you think? Is this the right direction?
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>>
>>>>>>>>>>> Links:
>>>>>>>>>>> -
>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>>>>>>
>>>>>>>>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by David Morávek <da...@gmail.com>.
Lukasz, you are right. I didn't think about structured coders. Thanks

On Mon, Oct 22, 2018 at 7:40 PM Lukasz Cwik <lc...@google.com> wrote:

> I don't believe an interface will work because KvCoder/ListCoder/... would
> only be order preserving if their components coders were order preserving.
>
> On Mon, Oct 22, 2018 at 8:52 AM David Morávek <da...@gmail.com>
> wrote:
>
>> What should be the next step? I guess we all agree that hadoop dependency
>> should be splitted out. Then we're left off with the SortValues transform +
>> in memory implementation. I'm ok with keeping this as a separate module, as
>> this would discourage users to use sorting in their business logic.
>>
>> Robert:
>> ad introduction of a new method for the coders. How about creating a new
>> interface eg. *OrderPreservingCoder*? Than you can require this
>> interface in your method signature and IDE will autocomplete all of the
>> possible implementations that you can use. In case of a new method, user
>> needs to now which implementations are order preserving and it can be
>> really confusing. I think the same thinking should apply to other coder
>> properties.
>>
>> D.
>>
>>
>>
>> On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick <ni...@google.com> wrote:
>>
>>> FYI: the BufferedExternalSorter depends on Hadoop client libraries
>>> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on
>>> the Hadoop service -- because the  ExternalSorter
>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java>
>>> uses Hadoop's SequenceFile
>>> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html> for
>>> on-disk sorting.
>>>
>>>
>>>
>>> On Thu, 18 Oct 2018 at 11:19 David Morávek <da...@gmail.com>
>>> wrote:
>>>
>>>> Kenn, I believe we should not introduce hadoop dependency to neither
>>>> sdks or runners. We may split sorting in two packages, one with the
>>>> transformation + in memory implementation (this is the part I'd love to see
>>>> become part of sdks-java-core) and second module with more robust external
>>>> sorter (with hadoop dep).
>>>>
>>>> Does this make sense?
>>>>
>>>>
>>>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <dh...@apache.org>
>>>> wrote:
>>>>
>>>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> The runner can always just depend on the sorter to do it the legacy
>>>>>> way by class matching; it shouldn't incur other dependency penalties... but
>>>>>> now that I look briefly, the sorter depends on Hadoop bits. That seems a
>>>>>> heavy price to pay for a user in any event. Are those Hadoop deps
>>>>>> reasonably self-contained?
>>>>>>
>>>>>
>>>>> Nice catch, Kenn! This is indeed why we didn't originally include the
>>>>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>>>>> time.
>>>>>
>>>>> Dan
>>>>>
>>>>>
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>>>>> executed via portability since the Runner will be able to perform
>>>>>>> PTransform replacement and optimization based upon the URN of the transform
>>>>>>> and its payload so it would never need to have the "Sorter" class in its
>>>>>>> classpath.
>>>>>>>
>>>>>>> I'm ambivalent about whether merging it now is worth it.
>>>>>>>
>>>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <
>>>>>>> david.moravek@gmail.com> wrote:
>>>>>>>
>>>>>>>> We can always fall back to the External sorter in case of merging
>>>>>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>>>>>> not be an issue.
>>>>>>>>
>>>>>>>> In case of non-merging windows, runner implementation would
>>>>>>>> probably require to group elements also by window during shuffle.
>>>>>>>>
>>>>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> One concern would be merging windows. This happens after shuffle,
>>>>>>>>> so even if the shuffle were sorted you would need to do a sorted merge of
>>>>>>>>> two sorted buffers.
>>>>>>>>>
>>>>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>>>>> david.moravek@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>>>>
>>>>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>>>>> extension contains *SortValues* transformation and
>>>>>>>>>> implementations of different sorters.
>>>>>>>>>>
>>>>>>>>>> Performance-wise it would be great to be able* to delegate
>>>>>>>>>> sorting to a runner* if it supports sort based shuffle. In order
>>>>>>>>>> to do so, we should *move SortValues transformation to
>>>>>>>>>> sdks-java-core*, so a runner can easily provide its own
>>>>>>>>>> implementation.
>>>>>>>>>>
>>>>>>>>>> The robust implementation is needed mainly for building of HFiles
>>>>>>>>>> for the HBase bulk load. When using external sorter, we often sort the
>>>>>>>>>> whole data set twice (shuffle may already did a job).
>>>>>>>>>>
>>>>>>>>>> SortValues can not use custom comparator, because we want to be
>>>>>>>>>> able to push sorting logic down to a byte based shuffle.
>>>>>>>>>>
>>>>>>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>>>>>>> think we should add a *SortValues.perKey* method, which accepts
>>>>>>>>>> a secondary key extractor and coder, as the usage would be easier to
>>>>>>>>>> understand. Also, this explicitly states, that we sort values
>>>>>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>>>>>> Example usage:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>>>>>> BigEndianLongCoder.of()))*
>>>>>>>>>>
>>>>>>>>>> What do you think? Is this the right direction?
>>>>>>>>>>
>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>
>>>>>>>>>> Links:
>>>>>>>>>> -
>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>>>>>
>>>>>>>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by Lukasz Cwik <lc...@google.com>.
I don't believe an interface will work because KvCoder/ListCoder/... would
only be order preserving if their components coders were order preserving.

On Mon, Oct 22, 2018 at 8:52 AM David Morávek <da...@gmail.com>
wrote:

> What should be the next step? I guess we all agree that hadoop dependency
> should be splitted out. Then we're left off with the SortValues transform +
> in memory implementation. I'm ok with keeping this as a separate module, as
> this would discourage users to use sorting in their business logic.
>
> Robert:
> ad introduction of a new method for the coders. How about creating a new
> interface eg. *OrderPreservingCoder*? Than you can require this interface
> in your method signature and IDE will autocomplete all of the possible
> implementations that you can use. In case of a new method, user needs to
> now which implementations are order preserving and it can be really
> confusing. I think the same thinking should apply to other coder properties.
>
> D.
>
>
>
> On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick <ni...@google.com> wrote:
>
>> FYI: the BufferedExternalSorter depends on Hadoop client libraries
>> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on
>> the Hadoop service -- because the  ExternalSorter
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java>
>> uses Hadoop's SequenceFile
>> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html> for
>> on-disk sorting.
>>
>>
>>
>> On Thu, 18 Oct 2018 at 11:19 David Morávek <da...@gmail.com>
>> wrote:
>>
>>> Kenn, I believe we should not introduce hadoop dependency to neither
>>> sdks or runners. We may split sorting in two packages, one with the
>>> transformation + in memory implementation (this is the part I'd love to see
>>> become part of sdks-java-core) and second module with more robust external
>>> sorter (with hadoop dep).
>>>
>>> Does this make sense?
>>>
>>>
>>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <dh...@apache.org>
>>> wrote:
>>>
>>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> The runner can always just depend on the sorter to do it the legacy
>>>>> way by class matching; it shouldn't incur other dependency penalties... but
>>>>> now that I look briefly, the sorter depends on Hadoop bits. That seems a
>>>>> heavy price to pay for a user in any event. Are those Hadoop deps
>>>>> reasonably self-contained?
>>>>>
>>>>
>>>> Nice catch, Kenn! This is indeed why we didn't originally include the
>>>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>>>> time.
>>>>
>>>> Dan
>>>>
>>>>
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>>>> executed via portability since the Runner will be able to perform
>>>>>> PTransform replacement and optimization based upon the URN of the transform
>>>>>> and its payload so it would never need to have the "Sorter" class in its
>>>>>> classpath.
>>>>>>
>>>>>> I'm ambivalent about whether merging it now is worth it.
>>>>>>
>>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <
>>>>>> david.moravek@gmail.com> wrote:
>>>>>>
>>>>>>> We can always fall back to the External sorter in case of merging
>>>>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>>>>> not be an issue.
>>>>>>>
>>>>>>> In case of non-merging windows, runner implementation would probably
>>>>>>> require to group elements also by window during shuffle.
>>>>>>>
>>>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> One concern would be merging windows. This happens after shuffle,
>>>>>>>> so even if the shuffle were sorted you would need to do a sorted merge of
>>>>>>>> two sorted buffers.
>>>>>>>>
>>>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>>>> david.moravek@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>>>
>>>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>>>> extension contains *SortValues* transformation and
>>>>>>>>> implementations of different sorters.
>>>>>>>>>
>>>>>>>>> Performance-wise it would be great to be able* to delegate
>>>>>>>>> sorting to a runner* if it supports sort based shuffle. In order
>>>>>>>>> to do so, we should *move SortValues transformation to
>>>>>>>>> sdks-java-core*, so a runner can easily provide its own
>>>>>>>>> implementation.
>>>>>>>>>
>>>>>>>>> The robust implementation is needed mainly for building of HFiles
>>>>>>>>> for the HBase bulk load. When using external sorter, we often sort the
>>>>>>>>> whole data set twice (shuffle may already did a job).
>>>>>>>>>
>>>>>>>>> SortValues can not use custom comparator, because we want to be
>>>>>>>>> able to push sorting logic down to a byte based shuffle.
>>>>>>>>>
>>>>>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>>>>>> think we should add a *SortValues.perKey* method, which accepts a
>>>>>>>>> secondary key extractor and coder, as the usage would be easier to
>>>>>>>>> understand. Also, this explicitly states, that we sort values
>>>>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>>>>> Example usage:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>>>>> BigEndianLongCoder.of()))*
>>>>>>>>>
>>>>>>>>> What do you think? Is this the right direction?
>>>>>>>>>
>>>>>>>>> Thanks for the comments!
>>>>>>>>>
>>>>>>>>> Links:
>>>>>>>>> -
>>>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>>>>
>>>>>>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by David Morávek <da...@gmail.com>.
What should be the next step? I guess we all agree that hadoop dependency
should be splitted out. Then we're left off with the SortValues transform +
in memory implementation. I'm ok with keeping this as a separate module, as
this would discourage users to use sorting in their business logic.

Robert:
ad introduction of a new method for the coders. How about creating a new
interface eg. *OrderPreservingCoder*? Than you can require this interface
in your method signature and IDE will autocomplete all of the possible
implementations that you can use. In case of a new method, user needs to
now which implementations are order preserving and it can be really
confusing. I think the same thinking should apply to other coder properties.

D.



On Thu, Oct 18, 2018 at 12:15 PM Niel Markwick <ni...@google.com> wrote:

> FYI: the BufferedExternalSorter depends on Hadoop client libraries
> (specifically hadoop_mapreduce_client_core and hadoop_common), but not on
> the Hadoop service -- because the  ExternalSorter
> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java>
> uses Hadoop's SequenceFile
> <http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html> for
> on-disk sorting.
>
>
>
> On Thu, 18 Oct 2018 at 11:19 David Morávek <da...@gmail.com>
> wrote:
>
>> Kenn, I believe we should not introduce hadoop dependency to neither sdks
>> or runners. We may split sorting in two packages, one with the
>> transformation + in memory implementation (this is the part I'd love to see
>> become part of sdks-java-core) and second module with more robust external
>> sorter (with hadoop dep).
>>
>> Does this make sense?
>>
>>
>> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <dh...@apache.org> wrote:
>>
>>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> The runner can always just depend on the sorter to do it the legacy way
>>>> by class matching; it shouldn't incur other dependency penalties... but now
>>>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
>>>> price to pay for a user in any event. Are those Hadoop deps reasonably
>>>> self-contained?
>>>>
>>>
>>> Nice catch, Kenn! This is indeed why we didn't originally include the
>>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>>> time.
>>>
>>> Dan
>>>
>>>
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>>> executed via portability since the Runner will be able to perform
>>>>> PTransform replacement and optimization based upon the URN of the transform
>>>>> and its payload so it would never need to have the "Sorter" class in its
>>>>> classpath.
>>>>>
>>>>> I'm ambivalent about whether merging it now is worth it.
>>>>>
>>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> We can always fall back to the External sorter in case of merging
>>>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>>>> not be an issue.
>>>>>>
>>>>>> In case of non-merging windows, runner implementation would probably
>>>>>> require to group elements also by window during shuffle.
>>>>>>
>>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> One concern would be merging windows. This happens after shuffle, so
>>>>>>> even if the shuffle were sorted you would need to do a sorted merge of two
>>>>>>> sorted buffers.
>>>>>>>
>>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>>> david.moravek@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>>
>>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>>> extension contains *SortValues* transformation and implementations
>>>>>>>> of different sorters.
>>>>>>>>
>>>>>>>> Performance-wise it would be great to be able* to delegate sorting
>>>>>>>> to a runner* if it supports sort based shuffle. In order to do so,
>>>>>>>> we should *move SortValues transformation to sdks-java-core*, so a
>>>>>>>> runner can easily provide its own implementation.
>>>>>>>>
>>>>>>>> The robust implementation is needed mainly for building of HFiles
>>>>>>>> for the HBase bulk load. When using external sorter, we often sort the
>>>>>>>> whole data set twice (shuffle may already did a job).
>>>>>>>>
>>>>>>>> SortValues can not use custom comparator, because we want to be
>>>>>>>> able to push sorting logic down to a byte based shuffle.
>>>>>>>>
>>>>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>>>>> think we should add a *SortValues.perKey* method, which accepts a
>>>>>>>> secondary key extractor and coder, as the usage would be easier to
>>>>>>>> understand. Also, this explicitly states, that we sort values
>>>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>>>> Example usage:
>>>>>>>>
>>>>>>>>
>>>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>>>> BigEndianLongCoder.of()))*
>>>>>>>>
>>>>>>>> What do you think? Is this the right direction?
>>>>>>>>
>>>>>>>> Thanks for the comments!
>>>>>>>>
>>>>>>>> Links:
>>>>>>>> -
>>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>>>
>>>>>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by Niel Markwick <ni...@google.com>.
FYI: the BufferedExternalSorter depends on Hadoop client libraries
(specifically hadoop_mapreduce_client_core and hadoop_common), but not on
the Hadoop service -- because the  ExternalSorter
<https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java>
uses Hadoop's SequenceFile
<http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/io/SequenceFile.html>
for
on-disk sorting.



On Thu, 18 Oct 2018 at 11:19 David Morávek <da...@gmail.com> wrote:

> Kenn, I believe we should not introduce hadoop dependency to neither sdks
> or runners. We may split sorting in two packages, one with the
> transformation + in memory implementation (this is the part I'd love to see
> become part of sdks-java-core) and second module with more robust external
> sorter (with hadoop dep).
>
> Does this make sense?
>
>
> On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <dh...@apache.org> wrote:
>
>> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> The runner can always just depend on the sorter to do it the legacy way
>>> by class matching; it shouldn't incur other dependency penalties... but now
>>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
>>> price to pay for a user in any event. Are those Hadoop deps reasonably
>>> self-contained?
>>>
>>
>> Nice catch, Kenn! This is indeed why we didn't originally include the
>> Sorter in core. The Hadoop deps have an enormous surface, or did at the
>> time.
>>
>> Dan
>>
>>
>>>
>>> Kenn
>>>
>>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>>> executed via portability since the Runner will be able to perform
>>>> PTransform replacement and optimization based upon the URN of the transform
>>>> and its payload so it would never need to have the "Sorter" class in its
>>>> classpath.
>>>>
>>>> I'm ambivalent about whether merging it now is worth it.
>>>>
>>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> We can always fall back to the External sorter in case of merging
>>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>>> not be an issue.
>>>>>
>>>>> In case of non-merging windows, runner implementation would probably
>>>>> require to group elements also by window during shuffle.
>>>>>
>>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> One concern would be merging windows. This happens after shuffle, so
>>>>>> even if the shuffle were sorted you would need to do a sorted merge of two
>>>>>> sorted buffers.
>>>>>>
>>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <
>>>>>> david.moravek@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>>
>>>>>>> Currently we have a separate module for sorting extension. The
>>>>>>> extension contains *SortValues* transformation and implementations
>>>>>>> of different sorters.
>>>>>>>
>>>>>>> Performance-wise it would be great to be able* to delegate sorting
>>>>>>> to a runner* if it supports sort based shuffle. In order to do so,
>>>>>>> we should *move SortValues transformation to sdks-java-core*, so a
>>>>>>> runner can easily provide its own implementation.
>>>>>>>
>>>>>>> The robust implementation is needed mainly for building of HFiles
>>>>>>> for the HBase bulk load. When using external sorter, we often sort the
>>>>>>> whole data set twice (shuffle may already did a job).
>>>>>>>
>>>>>>> SortValues can not use custom comparator, because we want to be able
>>>>>>> to push sorting logic down to a byte based shuffle.
>>>>>>>
>>>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>>>> think we should add a *SortValues.perKey* method, which accepts a
>>>>>>> secondary key extractor and coder, as the usage would be easier to
>>>>>>> understand. Also, this explicitly states, that we sort values
>>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>>> Example usage:
>>>>>>>
>>>>>>>
>>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>>> BigEndianLongCoder.of()))*
>>>>>>>
>>>>>>> What do you think? Is this the right direction?
>>>>>>>
>>>>>>> Thanks for the comments!
>>>>>>>
>>>>>>> Links:
>>>>>>> -
>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>>
>>>>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by David Morávek <da...@gmail.com>.
Kenn, I believe we should not introduce hadoop dependency to neither sdks
or runners. We may split sorting in two packages, one with the
transformation + in memory implementation (this is the part I'd love to see
become part of sdks-java-core) and second module with more robust external
sorter (with hadoop dep).

Does this make sense?


On Thu, Oct 18, 2018 at 2:03 AM Dan Halperin <dh...@apache.org> wrote:

> On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> The runner can always just depend on the sorter to do it the legacy way
>> by class matching; it shouldn't incur other dependency penalties... but now
>> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
>> price to pay for a user in any event. Are those Hadoop deps reasonably
>> self-contained?
>>
>
> Nice catch, Kenn! This is indeed why we didn't originally include the
> Sorter in core. The Hadoop deps have an enormous surface, or did at the
> time.
>
> Dan
>
>
>>
>> Kenn
>>
>> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Merging the sorter into sdks-java-core isn't needed for pipelines
>>> executed via portability since the Runner will be able to perform
>>> PTransform replacement and optimization based upon the URN of the transform
>>> and its payload so it would never need to have the "Sorter" class in its
>>> classpath.
>>>
>>> I'm ambivalent about whether merging it now is worth it.
>>>
>>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <da...@gmail.com>
>>> wrote:
>>>
>>>> We can always fall back to the External sorter in case of merging
>>>> windows. I reckon in this case, values usually fit in memory, so it would
>>>> not be an issue.
>>>>
>>>> In case of non-merging windows, runner implementation would probably
>>>> require to group elements also by window during shuffle.
>>>>
>>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> One concern would be merging windows. This happens after shuffle, so
>>>>> even if the shuffle were sorted you would need to do a sorted merge of two
>>>>> sorted buffers.
>>>>>
>>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>>
>>>>>> Currently we have a separate module for sorting extension. The
>>>>>> extension contains *SortValues* transformation and implementations
>>>>>> of different sorters.
>>>>>>
>>>>>> Performance-wise it would be great to be able* to delegate sorting
>>>>>> to a runner* if it supports sort based shuffle. In order to do so,
>>>>>> we should *move SortValues transformation to sdks-java-core*, so a
>>>>>> runner can easily provide its own implementation.
>>>>>>
>>>>>> The robust implementation is needed mainly for building of HFiles for
>>>>>> the HBase bulk load. When using external sorter, we often sort the whole
>>>>>> data set twice (shuffle may already did a job).
>>>>>>
>>>>>> SortValues can not use custom comparator, because we want to be able
>>>>>> to push sorting logic down to a byte based shuffle.
>>>>>>
>>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>>> think we should add a *SortValues.perKey* method, which accepts a
>>>>>> secondary key extractor and coder, as the usage would be easier to
>>>>>> understand. Also, this explicitly states, that we sort values
>>>>>> *perKey* only and that we sort using an *encoded secondary key*.
>>>>>> Example usage:
>>>>>>
>>>>>>
>>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>>> *input.apply(SortValues.perKey(KV::getValue,
>>>>>> BigEndianLongCoder.of()))*
>>>>>>
>>>>>> What do you think? Is this the right direction?
>>>>>>
>>>>>> Thanks for the comments!
>>>>>>
>>>>>> Links:
>>>>>> -
>>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>>
>>>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by Dan Halperin <dh...@apache.org>.
On Wed, Oct 17, 2018 at 3:44 PM Kenneth Knowles <ke...@apache.org> wrote:

> The runner can always just depend on the sorter to do it the legacy way by
> class matching; it shouldn't incur other dependency penalties... but now
> that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
> price to pay for a user in any event. Are those Hadoop deps reasonably
> self-contained?
>

Nice catch, Kenn! This is indeed why we didn't originally include the
Sorter in core. The Hadoop deps have an enormous surface, or did at the
time.

Dan


>
> Kenn
>
> On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Merging the sorter into sdks-java-core isn't needed for pipelines
>> executed via portability since the Runner will be able to perform
>> PTransform replacement and optimization based upon the URN of the transform
>> and its payload so it would never need to have the "Sorter" class in its
>> classpath.
>>
>> I'm ambivalent about whether merging it now is worth it.
>>
>> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <da...@gmail.com>
>> wrote:
>>
>>> We can always fall back to the External sorter in case of merging
>>> windows. I reckon in this case, values usually fit in memory, so it would
>>> not be an issue.
>>>
>>> In case of non-merging windows, runner implementation would probably
>>> require to group elements also by window during shuffle.
>>>
>>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> One concern would be merging windows. This happens after shuffle, so
>>>> even if the shuffle were sorted you would need to do a sorted merge of two
>>>> sorted buffers.
>>>>
>>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I want to summarize my thoughts on the per key value sorting.
>>>>>
>>>>> Currently we have a separate module for sorting extension. The
>>>>> extension contains *SortValues* transformation and implementations of
>>>>> different sorters.
>>>>>
>>>>> Performance-wise it would be great to be able* to delegate sorting to
>>>>> a runner* if it supports sort based shuffle. In order to do so, we
>>>>> should *move SortValues transformation to sdks-java-core*, so a
>>>>> runner can easily provide its own implementation.
>>>>>
>>>>> The robust implementation is needed mainly for building of HFiles for
>>>>> the HBase bulk load. When using external sorter, we often sort the whole
>>>>> data set twice (shuffle may already did a job).
>>>>>
>>>>> SortValues can not use custom comparator, because we want to be able
>>>>> to push sorting logic down to a byte based shuffle.
>>>>>
>>>>> The usage of SortValues transformation is little bit confusing. I
>>>>> think we should add a *SortValues.perKey* method, which accepts a
>>>>> secondary key extractor and coder, as the usage would be easier to
>>>>> understand. Also, this explicitly states, that we sort values *perKey*
>>>>> only and that we sort using an *encoded secondary key*. Example usage:
>>>>>
>>>>>
>>>>> *PCollection<KV<String, Long>> input = ...;*
>>>>> *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*
>>>>>
>>>>> What do you think? Is this the right direction?
>>>>>
>>>>> Thanks for the comments!
>>>>>
>>>>> Links:
>>>>> -
>>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>>
>>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by Kenneth Knowles <ke...@apache.org>.
The runner can always just depend on the sorter to do it the legacy way by
class matching; it shouldn't incur other dependency penalties... but now
that I look briefly, the sorter depends on Hadoop bits. That seems a heavy
price to pay for a user in any event. Are those Hadoop deps reasonably
self-contained?

Kenn

On Wed, Oct 17, 2018 at 2:39 PM Lukasz Cwik <lc...@google.com> wrote:

> Merging the sorter into sdks-java-core isn't needed for pipelines executed
> via portability since the Runner will be able to perform PTransform
> replacement and optimization based upon the URN of the transform and its
> payload so it would never need to have the "Sorter" class in its classpath.
>
> I'm ambivalent about whether merging it now is worth it.
>
> On Wed, Oct 17, 2018 at 2:31 PM David Morávek <da...@gmail.com>
> wrote:
>
>> We can always fall back to the External sorter in case of merging
>> windows. I reckon in this case, values usually fit in memory, so it would
>> not be an issue.
>>
>> In case of non-merging windows, runner implementation would probably
>> require to group elements also by window during shuffle.
>>
>> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com> wrote:
>>
>>> One concern would be merging windows. This happens after shuffle, so
>>> even if the shuffle were sorted you would need to do a sorted merge of two
>>> sorted buffers.
>>>
>>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <da...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I want to summarize my thoughts on the per key value sorting.
>>>>
>>>> Currently we have a separate module for sorting extension. The
>>>> extension contains *SortValues* transformation and implementations of
>>>> different sorters.
>>>>
>>>> Performance-wise it would be great to be able* to delegate sorting to
>>>> a runner* if it supports sort based shuffle. In order to do so, we
>>>> should *move SortValues transformation to sdks-java-core*, so a runner
>>>> can easily provide its own implementation.
>>>>
>>>> The robust implementation is needed mainly for building of HFiles for
>>>> the HBase bulk load. When using external sorter, we often sort the whole
>>>> data set twice (shuffle may already did a job).
>>>>
>>>> SortValues can not use custom comparator, because we want to be able to
>>>> push sorting logic down to a byte based shuffle.
>>>>
>>>> The usage of SortValues transformation is little bit confusing. I think
>>>> we should add a *SortValues.perKey* method, which accepts a secondary
>>>> key extractor and coder, as the usage would be easier to understand. Also,
>>>> this explicitly states, that we sort values *perKey* only and that we
>>>> sort using an *encoded secondary key*. Example usage:
>>>>
>>>>
>>>> *PCollection<KV<String, Long>> input = ...;*
>>>> *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*
>>>>
>>>> What do you think? Is this the right direction?
>>>>
>>>> Thanks for the comments!
>>>>
>>>> Links:
>>>> -
>>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>>
>>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by Lukasz Cwik <lc...@google.com>.
Merging the sorter into sdks-java-core isn't needed for pipelines executed
via portability since the Runner will be able to perform PTransform
replacement and optimization based upon the URN of the transform and its
payload so it would never need to have the "Sorter" class in its classpath.

I'm ambivalent about whether merging it now is worth it.

On Wed, Oct 17, 2018 at 2:31 PM David Morávek <da...@gmail.com>
wrote:

> We can always fall back to the External sorter in case of merging windows.
> I reckon in this case, values usually fit in memory, so it would not be an
> issue.
>
> In case of non-merging windows, runner implementation would probably
> require to group elements also by window during shuffle.
>
> On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com> wrote:
>
>> One concern would be merging windows. This happens after shuffle, so even
>> if the shuffle were sorted you would need to do a sorted merge of two
>> sorted buffers.
>>
>> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <da...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I want to summarize my thoughts on the per key value sorting.
>>>
>>> Currently we have a separate module for sorting extension. The extension
>>> contains *SortValues* transformation and implementations of different
>>> sorters.
>>>
>>> Performance-wise it would be great to be able* to delegate sorting to a
>>> runner* if it supports sort based shuffle. In order to do so, we should *move
>>> SortValues transformation to sdks-java-core*, so a runner can easily
>>> provide its own implementation.
>>>
>>> The robust implementation is needed mainly for building of HFiles for
>>> the HBase bulk load. When using external sorter, we often sort the whole
>>> data set twice (shuffle may already did a job).
>>>
>>> SortValues can not use custom comparator, because we want to be able to
>>> push sorting logic down to a byte based shuffle.
>>>
>>> The usage of SortValues transformation is little bit confusing. I think
>>> we should add a *SortValues.perKey* method, which accepts a secondary
>>> key extractor and coder, as the usage would be easier to understand. Also,
>>> this explicitly states, that we sort values *perKey* only and that we
>>> sort using an *encoded secondary key*. Example usage:
>>>
>>>
>>> *PCollection<KV<String, Long>> input = ...;*
>>> *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*
>>>
>>> What do you think? Is this the right direction?
>>>
>>> Thanks for the comments!
>>>
>>> Links:
>>> -
>>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>>
>>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by David Morávek <da...@gmail.com>.
We can always fall back to the External sorter in case of merging windows.
I reckon in this case, values usually fit in memory, so it would not be an
issue.

In case of non-merging windows, runner implementation would probably
require to group elements also by window during shuffle.

On Wed, Oct 17, 2018 at 11:10 PM Reuven Lax <re...@google.com> wrote:

> One concern would be merging windows. This happens after shuffle, so even
> if the shuffle were sorted you would need to do a sorted merge of two
> sorted buffers.
>
> On Wed, Oct 17, 2018 at 2:08 PM David Morávek <da...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I want to summarize my thoughts on the per key value sorting.
>>
>> Currently we have a separate module for sorting extension. The extension
>> contains *SortValues* transformation and implementations of different
>> sorters.
>>
>> Performance-wise it would be great to be able* to delegate sorting to a
>> runner* if it supports sort based shuffle. In order to do so, we should *move
>> SortValues transformation to sdks-java-core*, so a runner can easily
>> provide its own implementation.
>>
>> The robust implementation is needed mainly for building of HFiles for the
>> HBase bulk load. When using external sorter, we often sort the whole data
>> set twice (shuffle may already did a job).
>>
>> SortValues can not use custom comparator, because we want to be able to
>> push sorting logic down to a byte based shuffle.
>>
>> The usage of SortValues transformation is little bit confusing. I think
>> we should add a *SortValues.perKey* method, which accepts a secondary
>> key extractor and coder, as the usage would be easier to understand. Also,
>> this explicitly states, that we sort values *perKey* only and that we
>> sort using an *encoded secondary key*. Example usage:
>>
>>
>> *PCollection<KV<String, Long>> input = ...;*
>> *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*
>>
>> What do you think? Is this the right direction?
>>
>> Thanks for the comments!
>>
>> Links:
>> -
>> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>>
>

Re: [PROPOSAL] Move sorting to sdks-java-core

Posted by Reuven Lax <re...@google.com>.
One concern would be merging windows. This happens after shuffle, so even
if the shuffle were sorted you would need to do a sorted merge of two
sorted buffers.

On Wed, Oct 17, 2018 at 2:08 PM David Morávek <da...@gmail.com>
wrote:

> Hello,
>
> I want to summarize my thoughts on the per key value sorting.
>
> Currently we have a separate module for sorting extension. The extension
> contains *SortValues* transformation and implementations of different
> sorters.
>
> Performance-wise it would be great to be able* to delegate sorting to a
> runner* if it supports sort based shuffle. In order to do so, we should *move
> SortValues transformation to sdks-java-core*, so a runner can easily
> provide its own implementation.
>
> The robust implementation is needed mainly for building of HFiles for the
> HBase bulk load. When using external sorter, we often sort the whole data
> set twice (shuffle may already did a job).
>
> SortValues can not use custom comparator, because we want to be able to
> push sorting logic down to a byte based shuffle.
>
> The usage of SortValues transformation is little bit confusing. I think we
> should add a *SortValues.perKey* method, which accepts a secondary key
> extractor and coder, as the usage would be easier to understand. Also, this
> explicitly states, that we sort values *perKey* only and that we sort
> using an *encoded secondary key*. Example usage:
>
>
> *PCollection<KV<String, Long>> input = ...;*
> *input.apply(SortValues.perKey(KV::getValue, BigEndianLongCoder.of()))*
>
> What do you think? Is this the right direction?
>
> Thanks for the comments!
>
> Links:
> -
> http://mail-archives.apache.org/mod_mbox/beam-dev/201805.mbox/%3Cl8D.1U3Hp.5IxQdKoVDzH.1R3dyk%40seznam.cz%3E
>