You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/16 17:26:00 UTC

[jira] [Commented] (BEAM-12923) Add Comparator to SortValues PTransformation

    [ https://issues.apache.org/jira/browse/BEAM-12923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507749#comment-17507749 ] 

Beam JIRA Bot commented on BEAM-12923:
--------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> Add Comparator to SortValues PTransformation
> --------------------------------------------
>
>                 Key: BEAM-12923
>                 URL: https://issues.apache.org/jira/browse/BEAM-12923
>             Project: Beam
>          Issue Type: Improvement
>          Components: extensions-java-sorter
>            Reporter: Nikhil Goyal
>            Priority: P2
>              Labels: stale-P2
>
> Code in Context:
> [https://github.com/apache/beam/tree/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter]
>  
> Current implementation only compares serialized bytes. It would be great to have a custom comparator to sort the elements. I was able to prototype a solution but then hit some road blocks so decided to open this ticket to get some feedback.
> *First approach*
> We add Comparator<SecondaryKey> to SortValues and propagate it down to MemorySorter, ExternalSorter. This will require adding TypeParams to all the classes including BufferedExternalSorter, MemorySorter, ExternalSorter, Sorter Interface. Instead of creating List<KV<byte[], byte[]>> we will have List<KV<KeyT, ValueT>> and it will be sorted by having comparator of KV<KeyT, ValueT>
> Potential issues:
> 1) Since all the classes are public it does make this change not backward compatible. If we expect users to interact only with SortValues PTransform we can make this change by keeping SortValues backward compatible (If no comparator is specified we fall back to default binary comparator).
> 2) Both NativeExternalSorter and MemorySorter has logic to calculate memory used which can now get complicated as we are keeping deserialized objects in memory. We can mitigate around it by using `Runtime.getRuntime().freeMemory` before and after deserializing objects to estimate size of objects. (It is possible that by the time deserialization happens GC frees up some memory and we get inaccurate usage. We will have to keep a running average of the memory allocated to every record or take ratio of serialized bytes to deserialized objects)
> *Second approach*:
> We add Comparator<SecondaryKey> to SortValues and generate Comparator<byte[]> out of it and use that instead. Small code snippet to show how the comparator would look like
> private static class OrderingComparator<KeyT, ValueT> implements Comparator<byte[]> {
>  private final Comparator<KV<KeyT, ValueT>> comparator;
>  private final KvCoder<Coder<KeyT>, Coder<ValueT>> kvCoder;
> @Override
>  public int compare(byte[] o1, byte[] o2) {
>  KV<KeyT, ValueT> kv1 = CoderUtils.decodeFromByteArray(kvCoder, o1);
>  KV<KeyT, ValueT> kv2 = CoderUtils.decodeFromByteArray(kvCoder, o2);
>  comparator.compare(kv1, kv2);
>  }
>  }
> Potential issues:
> 1) Sort operation is slower compared to first approach as we are serializing & deserializing objects for every comparison.
> 2) Memory usage: We are allocating objects in memory inside the compare method. I am not sure if they would be allocated only on the Stack (because of escape analysis) or if they would be allocated in the YoungGen. Either way they should get cleaned up quickly avoiding any memory issues.
> I will create a patch after getting some feedback on this.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)