You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2019/02/28 13:08:00 UTC
[jira] [Updated] (FLINK-6070) Suggestion: add ComparableTuple types
[ https://issues.apache.org/jira/browse/FLINK-6070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-6070:
----------------------------------
Component/s: (was: Core)
API / Type Serialization System
> Suggestion: add ComparableTuple types
> -------------------------------------
>
> Key: FLINK-6070
> URL: https://issues.apache.org/jira/browse/FLINK-6070
> Project: Flink
> Issue Type: Improvement
> Components: API / Type Serialization System
> Affects Versions: 1.2.0
> Reporter: Luke Hutchison
> Priority: Minor
>
> Since Java doesn't have built-in tuple types, I find myself using Flink tuples for a lot of tasks in Flink programs. One downside is that these tuples are not inherently comparable, so when you want to sort a collection of tuples, you have to provide a custom comparator.
> I created a tuple sorting class, as follows. (Only the methods for Tuple2 are defined at the bottom, similar methods could be added for other tuple types.) I wanted to get feedback on whether something like this would be considered useful for Flink before I submitted a PR.
> {code}
> import java.util.ArrayList;
> import java.util.Collection;
> import java.util.Collections;
> import java.util.Comparator;
> import java.util.List;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> /** A class for sorting collections of tuples. */
> public class TupleSorter {
> /** Produce a Tuple comparator for the given number of fields, with the requested field priority and sort order. */
> private static Comparator<Tuple> newComparator(final int tupleLen, final int[] fieldPriority,
> final int[] sortDescendingIndices) {
> if (fieldPriority == null || fieldPriority.length != tupleLen) {
> throw new IllegalArgumentException("Invalid sort order");
> }
> boolean[] idxUsed = new boolean[tupleLen];
> for (int i = 0; i < fieldPriority.length; i++) {
> int idx = fieldPriority[i];
> if (idx < 0 || idx >= tupleLen) {
> throw new IllegalArgumentException("fieldPriority entry out of range: " + idx);
> }
> if (idxUsed[idx]) {
> throw new IllegalArgumentException("fieldPriority entry duplicated: " + idx);
> }
> idxUsed[idx] = true;
> }
> boolean[] sortDescending = new boolean[tupleLen];
> for (int i = 0; i < sortDescendingIndices.length; i++) {
> int idx = sortDescendingIndices[i];
> if (idx < 0 || idx >= tupleLen) {
> throw new IllegalArgumentException("sortDescendingIndices entry out of range: " + idx);
> }
> sortDescending[idx] = true;
> }
> return (tuple0, tuple1) -> {
> for (int i = 0; i < tupleLen; i++) {
> int idx = fieldPriority[i];
> @SuppressWarnings({ "rawtypes", "unchecked" })
> int diff = ((Comparable) tuple0.getField(idx)).compareTo((Comparable) tuple1.getField(idx));
> if (sortDescending[i]) {
> diff = -diff;
> }
> if (diff != 0) {
> return diff;
> }
> }
> return 0;
> };
> }
> /**
> * Sort a list of tuples.
> *
> * @param list
> * The list of tuples.
> * @param fieldPriority
> * The sort priority for the fields (primary an secondary sort key): a permutation of the field indices 0
> * and 1. The default sort order within a field is ascending.
> * @param sortDescendingIndices
> * If provided, inverts the sort order for a given field index from ascending to descending order.
> */
> public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> void sort(final List<Tuple2<T0, T1>> list,
> final int[] fieldPriority, final int... sortDescendingIndices) {
> list.sort(newComparator(/* tupleLen = */ 2, fieldPriority, sortDescendingIndices));
> }
> /**
> * Produce a sorted copy of a collection of tuples.
> *
> * @param list
> * The list of tuples.
> * @param fieldPriority
> * The sort priority for the fields (primary an secondary sort key): a permutation of the field indices 0
> * and 1. The default sort order within a field is ascending.
> * @param sortDescendingIndices
> * If provided, inverts the sort order for a given field index from ascending to descending order.
> */
> public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> ArrayList<Tuple2<T0, T1>> sortCopy(
> final Collection<Tuple2<T0, T1>> collection, final int[] fieldPriority,
> final int... sortDescendingIndices) {
> ArrayList<Tuple2<T0, T1>> list = new ArrayList<>(collection);
> Collections.sort(list, newComparator(/* tupleLen = */ 2, fieldPriority, sortDescendingIndices));
> return list;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)