You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2019/02/28 13:08:00 UTC

[jira] [Updated] (FLINK-6070) Suggestion: add ComparableTuple types

     [ https://issues.apache.org/jira/browse/FLINK-6070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Robert Metzger updated FLINK-6070:
----------------------------------
    Component/s:     (was: Core)
                 API / Type Serialization System

> Suggestion: add ComparableTuple types
> -------------------------------------
>
>                 Key: FLINK-6070
>                 URL: https://issues.apache.org/jira/browse/FLINK-6070
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Type Serialization System
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>            Priority: Minor
>
> Since Java doesn't have built-in tuple types, I find myself using Flink tuples for a lot of tasks in Flink programs. One downside is that these tuples are not inherently comparable, so when you want to sort a collection of tuples, you have to provide a custom comparator.
> I created a tuple sorting class, as follows. (Only the methods for Tuple2 are defined at the bottom, similar methods could be added for other tuple types.) I wanted to get feedback on whether something like this would be considered useful for Flink before I submitted a PR.
> {code}
> import java.util.ArrayList;
> import java.util.Collection;
> import java.util.Collections;
> import java.util.Comparator;
> import java.util.List;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> /** A class for sorting collections of tuples. */
> public class TupleSorter {
>     /** Produce a Tuple comparator for the given number of fields, with the requested field priority and sort order. */
>     private static Comparator<Tuple> newComparator(final int tupleLen, final int[] fieldPriority,
>             final int[] sortDescendingIndices) {
>         if (fieldPriority == null || fieldPriority.length != tupleLen) {
>             throw new IllegalArgumentException("Invalid sort order");
>         }
>         boolean[] idxUsed = new boolean[tupleLen];
>         for (int i = 0; i < fieldPriority.length; i++) {
>             int idx = fieldPriority[i];
>             if (idx < 0 || idx >= tupleLen) {
>                 throw new IllegalArgumentException("fieldPriority entry out of range: " + idx);
>             }
>             if (idxUsed[idx]) {
>                 throw new IllegalArgumentException("fieldPriority entry duplicated: " + idx);
>             }
>             idxUsed[idx] = true;
>         }
>         boolean[] sortDescending = new boolean[tupleLen];
>         for (int i = 0; i < sortDescendingIndices.length; i++) {
>             int idx = sortDescendingIndices[i];
>             if (idx < 0 || idx >= tupleLen) {
>                 throw new IllegalArgumentException("sortDescendingIndices entry out of range: " + idx);
>             }
>             sortDescending[idx] = true;
>         }
>         return (tuple0, tuple1) -> {
>             for (int i = 0; i < tupleLen; i++) {
>                 int idx = fieldPriority[i];
>                 @SuppressWarnings({ "rawtypes", "unchecked" })
>                 int diff = ((Comparable) tuple0.getField(idx)).compareTo((Comparable) tuple1.getField(idx));
>                 if (sortDescending[i]) {
>                     diff = -diff;
>                 }
>                 if (diff != 0) {
>                     return diff;
>                 }
>             }
>             return 0;
>         };
>     }
>     /**
>      * Sort a list of tuples.
>      * 
>      * @param list
>      *            The list of tuples.
>      * @param fieldPriority
>      *            The sort priority for the fields (primary an secondary sort key): a permutation of the field indices 0
>      *            and 1. The default sort order within a field is ascending.
>      * @param sortDescendingIndices
>      *            If provided, inverts the sort order for a given field index from ascending to descending order.
>      */
>     public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> void sort(final List<Tuple2<T0, T1>> list,
>             final int[] fieldPriority, final int... sortDescendingIndices) {
>         list.sort(newComparator(/* tupleLen = */ 2, fieldPriority, sortDescendingIndices));
>     }
>     /**
>      * Produce a sorted copy of a collection of tuples.
>      * 
>      * @param list
>      *            The list of tuples.
>      * @param fieldPriority
>      *            The sort priority for the fields (primary an secondary sort key): a permutation of the field indices 0
>      *            and 1. The default sort order within a field is ascending.
>      * @param sortDescendingIndices
>      *            If provided, inverts the sort order for a given field index from ascending to descending order.
>      */
>     public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> ArrayList<Tuple2<T0, T1>> sortCopy(
>             final Collection<Tuple2<T0, T1>> collection, final int[] fieldPriority,
>             final int... sortDescendingIndices) {
>         ArrayList<Tuple2<T0, T1>> list = new ArrayList<>(collection);
>         Collections.sort(list, newComparator(/* tupleLen = */ 2, fieldPriority, sortDescendingIndices));
>         return list;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)