You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dawidwys <gi...@git.apache.org> on 2016/04/02 13:16:44 UTC

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

GitHub user dawidwys opened a pull request:

    https://github.com/apache/flink/pull/1848

    [FLINK-3665] Implemented sort orders support in range partitioning

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dawidwys/flink withOrders

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1848.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1848
    
----
commit bbb46d6fc7c555ba458cb129805ca323ccc8e2d2
Author: dawid <da...@gmail.com>
Date:   2016-04-02T11:10:59Z

    [FLINK-3665] Implemented sort orders support in range partitioning

commit bdbaf8ab9dda530128fd92193c08ddc31b91b4a8
Author: dawid <da...@gmail.com>
Date:   2016-04-02T11:15:29Z

    Removed unnecessary code

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-212434056
  
    You can leave the commits as they are. I can merge them before the PR is merged. 
    The comment about the confusing nesting was not meant as a change request. It's fine how it is. Just requires a bit of thought ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-212590297
  
    Thanks @dawidwys, unrelated test failures are OK. 
    I'll run the tests again after I rebasing and merging the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60409063
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 - 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>() {
    +					@Override
    +					public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception {
    +						return new Tuple2<>(new Tuple2<>(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +				new LongComparator(true));
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0)
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>, Tuple2<Long, Long>>() {
    +					@Override
    +					public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values,
    +											 Collector<Tuple2<Long, Long>> out) throws Exception {
    +						for (Tuple2<Tuple2<Long, Long>, Long> value : values) {
    +							out.collect(value.f0);
    +						}
    +					}
    +				})
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 + 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<ComparablePojo, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() {
    +					@Override
    +					public Tuple2<ComparablePojo, Long> map(Long value) throws Exception {
    +						return new Tuple2<>(new ComparablePojo(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final List<Tuple2<ComparablePojo, ComparablePojo>> collected = dataSet
    +				.partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() {
    +					@Override
    +					public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception {
    +						return value.f0;
    +					}
    +				})
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MinMaxSelector<>(new ComparablePojoComparator()))
    +				.mapPartition(new ExtractComparablePojo())
    +				.collect();
    +
    +		final Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator =
    +				new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() {
    +			@Override
    +			public int compare(Tuple2<ComparablePojo, ComparablePojo> o1,
    +							   Tuple2<ComparablePojo, ComparablePojo> o2) {
    +				return o1.f0.compareTo(o2.f1);
    +			}
    +		};
    +		Collections.sort(collected, pojoComparator);
    +
    +		ComparablePojo previousMax = null;
    +		for (Tuple2<ComparablePojo, ComparablePojo> element : collected) {
    +			if (previousMax == null) {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				previousMax = element.f1;
    +			} else {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				if (previousMax.first.equals(element.f0.first)) {
    +					assertEquals(previousMax.second - 1, element.f0.second.longValue());
    +				}
    +				previousMax = element.f1;
    +			}
    +		}
    +	}
    +
    +	private static class ExtractComparablePojo implements MapPartitionFunction<
    +			Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>,
    +			Tuple2<ComparablePojo, ComparablePojo>> {
    +
    +		@Override
    +		public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> values,
    +								 Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception {
    +			for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> value : values) {
    +				out.collect(new Tuple2<>(value.f0.f0, value.f1.f0));
    +			}
    +		}
    +	}
    +
    +    private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo, Long>>, Serializable {
    +
    +		@Override
    +		public int compare(Tuple2<ComparablePojo, Long> o1,
    +						   Tuple2<ComparablePojo, Long> o2) {
    +			return o1.f0.compareTo(o2.f0);
    +		}
    +	}
    +
    +	private static class ComparablePojo implements Comparable<ComparablePojo> {
    +		private Long first;
    +		private Long second;
    +
    +		public Long getFirst() {
    +			return first;
    +		}
    +
    +		public void setFirst(Long first) {
    +			this.first = first;
    +		}
    +
    +		public Long getSecond() {
    +			return second;
    +		}
    +
    +		public void setSecond(Long second) {
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo(Long first,
    +							  Long second) {
    +			this.first = first;
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo() {
    +		}
    +
    +		@Override
    +		public int compareTo(ComparablePojo o) {
    +			final int firstResult = Long.compare(this.first, o.first);
    +			if (firstResult == 0) {
    +				return (-1) * Long.compare(this.second, o.second);
    +			}
    +
    +			return firstResult;
    +		}
    +	}
    +
     	private static class ObjectSelfKeySelector implements KeySelector<Long, Long> {
     		@Override
     		public Long getKey(Long value) throws Exception {
     			return value;
     		}
     	}
     
    -	private static class MinMaxSelector implements MapPartitionFunction<Long, Tuple2<Long, Long>> {
    +	private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T, T>> {
    +
    +		private final Comparator<T> comparator;
    +
    +		public MinMaxSelector(Comparator<T> comparator) {
    +			this.comparator = comparator;
    +		}
    +
     		@Override
    -		public void mapPartition(Iterable<Long> values, Collector<Tuple2<Long, Long>> out) throws Exception {
    -			long max = Long.MIN_VALUE;
    -			long min = Long.MAX_VALUE;
    -			for (long value : values) {
    -				if (value > max) {
    +		public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>> out) throws Exception {
    +			Iterator<T> itr = values.iterator();
    +			T min = itr.next();
    +			T max = min;
    +			T value;
    +			while (itr.hasNext()) {
    +				value= itr.next();
    +				if (comparator.compare(value, min) < 0) {
    +					min = value;
    +				}
    +				if (comparator.compare(value, max) > 0) {
     					max = value;
     				}
     
    -				if (value < min) {
    -					min = value;
    -				}
     			}
    -			Tuple2<Long, Long> result = new Tuple2<>(min, max);
    +
    +			Tuple2<T, T> result = new Tuple2<>(min, max);
     			out.collect(result);
     		}
     	}
     
    -	private static class Tuple2Comparator implements Comparator<Tuple2<Long, Long>> {
    +	private static class Tuple2Comparator<T> implements Comparator<Tuple2<T, T>>, Serializable {
    +
    +		private final Comparator<T> firstComparator;
    +		private final Comparator<T> secondComparator;
    +
    +		public Tuple2Comparator(Comparator<T> comparator) {
    +			this(comparator, comparator);
    +		}
    +
    +		public Tuple2Comparator(Comparator<T> firstComparator,
    +								Comparator<T> secondComparator) {
    +			this.firstComparator = firstComparator;
    +			this.secondComparator = secondComparator;
    +		}
    +
     		@Override
    -		public int compare(Tuple2<Long, Long> first, Tuple2<Long, Long> second) {
    -			long result = first.f0 - second.f0;
    +		public int compare(Tuple2<T, T> first, Tuple2<T, T> second) {
    +			long result = firstComparator.compare(first.f0, second.f0);
     			if (result > 0) {
     				return 1;
     			} else if (result < 0) {
     				return -1;
     			}
     
    -			result = first.f1 - second.f1;
    +			result = secondComparator.compare(first.f1, second.f1);
    --- End diff --
    
    Right, but the second case is only applied in sorting. So in case that min-values are equal will fail on the check that prevMax < curMin.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-206969649
  
    Hi @dawidwys, thanks for the PR. I had a few comments. 
    Please let me know if you have any questions regarding the nested and flattened keys. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-210485959
  
    Hi @fhueske, I hope I applied all your comments, so it would be nice if you could have a look at it once again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-212432502
  
    Unfortunately I agree that it can be confusing but could not find an easier way to properly test it.
    I will fix the issue with the smaller equal -> smaller.
    
    Just a quick question regarding the PR, shall I squash those commits or leave them as it is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60074102
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java ---
    @@ -90,13 +90,20 @@ public SemanticProperties getSemanticProperties() {
     		private final PartitionMethod pMethod;
     		private final Partitioner<?> customPartitioner;
     		private final DataDistribution distribution;
    -		
    +		private final Ordering ordering;
    +
     		public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner<?> customPartitioner, DataDistribution distribution) {
    +			this(pMethod, pKeys, null, customPartitioner, distribution);
    +		}
    +
    +		public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Ordering ordering, Partitioner<?>
    +				customPartitioner, DataDistribution distribution) {
     			super(pKeys);
    -			
    +
     			this.pMethod = pMethod;
     			this.customPartitioner = customPartitioner;
     			this.distribution = distribution;
    +			this.ordering = ordering;
    --- End diff --
    
    Can you add a check that the `ordering` is valid (number of orders == number of keys. Order keys == keys)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60406798
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 - 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>() {
    +					@Override
    +					public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception {
    +						return new Tuple2<>(new Tuple2<>(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +				new LongComparator(true));
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0)
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>, Tuple2<Long, Long>>() {
    +					@Override
    +					public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values,
    +											 Collector<Tuple2<Long, Long>> out) throws Exception {
    +						for (Tuple2<Tuple2<Long, Long>, Long> value : values) {
    +							out.collect(value.f0);
    +						}
    +					}
    +				})
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 + 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<ComparablePojo, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() {
    +					@Override
    +					public Tuple2<ComparablePojo, Long> map(Long value) throws Exception {
    +						return new Tuple2<>(new ComparablePojo(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final List<Tuple2<ComparablePojo, ComparablePojo>> collected = dataSet
    +				.partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() {
    +					@Override
    +					public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception {
    +						return value.f0;
    +					}
    +				})
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MinMaxSelector<>(new ComparablePojoComparator()))
    +				.mapPartition(new ExtractComparablePojo())
    +				.collect();
    +
    +		final Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator =
    +				new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() {
    +			@Override
    +			public int compare(Tuple2<ComparablePojo, ComparablePojo> o1,
    +							   Tuple2<ComparablePojo, ComparablePojo> o2) {
    +				return o1.f0.compareTo(o2.f1);
    +			}
    +		};
    +		Collections.sort(collected, pojoComparator);
    +
    +		ComparablePojo previousMax = null;
    +		for (Tuple2<ComparablePojo, ComparablePojo> element : collected) {
    +			if (previousMax == null) {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				previousMax = element.f1;
    +			} else {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				if (previousMax.first.equals(element.f0.first)) {
    +					assertEquals(previousMax.second - 1, element.f0.second.longValue());
    +				}
    +				previousMax = element.f1;
    +			}
    +		}
    +	}
    +
    +	private static class ExtractComparablePojo implements MapPartitionFunction<
    +			Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>,
    +			Tuple2<ComparablePojo, ComparablePojo>> {
    +
    +		@Override
    +		public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> values,
    +								 Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception {
    +			for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> value : values) {
    +				out.collect(new Tuple2<>(value.f0.f0, value.f1.f0));
    +			}
    +		}
    +	}
    +
    +    private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo, Long>>, Serializable {
    +
    +		@Override
    +		public int compare(Tuple2<ComparablePojo, Long> o1,
    +						   Tuple2<ComparablePojo, Long> o2) {
    +			return o1.f0.compareTo(o2.f0);
    +		}
    +	}
    +
    +	private static class ComparablePojo implements Comparable<ComparablePojo> {
    +		private Long first;
    +		private Long second;
    +
    +		public Long getFirst() {
    +			return first;
    +		}
    +
    +		public void setFirst(Long first) {
    +			this.first = first;
    +		}
    +
    +		public Long getSecond() {
    +			return second;
    +		}
    +
    +		public void setSecond(Long second) {
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo(Long first,
    +							  Long second) {
    +			this.first = first;
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo() {
    +		}
    +
    +		@Override
    +		public int compareTo(ComparablePojo o) {
    +			final int firstResult = Long.compare(this.first, o.first);
    +			if (firstResult == 0) {
    +				return (-1) * Long.compare(this.second, o.second);
    +			}
    +
    +			return firstResult;
    +		}
    +	}
    +
     	private static class ObjectSelfKeySelector implements KeySelector<Long, Long> {
     		@Override
     		public Long getKey(Long value) throws Exception {
     			return value;
     		}
     	}
     
    -	private static class MinMaxSelector implements MapPartitionFunction<Long, Tuple2<Long, Long>> {
    +	private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T, T>> {
    +
    +		private final Comparator<T> comparator;
    +
    +		public MinMaxSelector(Comparator<T> comparator) {
    +			this.comparator = comparator;
    +		}
    +
     		@Override
    -		public void mapPartition(Iterable<Long> values, Collector<Tuple2<Long, Long>> out) throws Exception {
    -			long max = Long.MIN_VALUE;
    -			long min = Long.MAX_VALUE;
    -			for (long value : values) {
    -				if (value > max) {
    +		public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>> out) throws Exception {
    +			Iterator<T> itr = values.iterator();
    +			T min = itr.next();
    +			T max = min;
    +			T value;
    +			while (itr.hasNext()) {
    +				value= itr.next();
    +				if (comparator.compare(value, min) < 0) {
    +					min = value;
    +				}
    +				if (comparator.compare(value, max) > 0) {
     					max = value;
     				}
     
    -				if (value < min) {
    -					min = value;
    -				}
     			}
    -			Tuple2<Long, Long> result = new Tuple2<>(min, max);
    +
    +			Tuple2<T, T> result = new Tuple2<>(min, max);
     			out.collect(result);
     		}
     	}
     
    -	private static class Tuple2Comparator implements Comparator<Tuple2<Long, Long>> {
    +	private static class Tuple2Comparator<T> implements Comparator<Tuple2<T, T>>, Serializable {
    +
    +		private final Comparator<T> firstComparator;
    +		private final Comparator<T> secondComparator;
    +
    +		public Tuple2Comparator(Comparator<T> comparator) {
    +			this(comparator, comparator);
    +		}
    +
    +		public Tuple2Comparator(Comparator<T> firstComparator,
    +								Comparator<T> secondComparator) {
    +			this.firstComparator = firstComparator;
    +			this.secondComparator = secondComparator;
    +		}
    +
     		@Override
    -		public int compare(Tuple2<Long, Long> first, Tuple2<Long, Long> second) {
    -			long result = first.f0 - second.f0;
    +		public int compare(Tuple2<T, T> first, Tuple2<T, T> second) {
    +			long result = firstComparator.compare(first.f0, second.f0);
     			if (result > 0) {
     				return 1;
     			} else if (result < 0) {
     				return -1;
     			}
     
    -			result = first.f1 - second.f1;
    +			result = secondComparator.compare(first.f1, second.f1);
    --- End diff --
    
    Ah, I just realized the comparator is used for different purposes. For example in `testRangePartitionerOnSequenceDataWithOrders` `Tuple2Comparator` is used in three places: 
    
    1. to compare `Tuple2<Long, Long>` inside the `MinMaxSelector`. For this the implementation is correct.
    2. to compare the `Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>` of `collected`. I was referring to this one when I said, it should only compare on the min-element, i.e., `f.0`. If there are two elements in `collected` that have the same min-value (which is a `Tuple2<Long, Long>` again), this would indicate an invalid partitioning.
    3. as a wrapped comparator to compare the `Tuple2<Long, Long>` fields when comparing the `Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>` of `collected`. Here, the implementation is also correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60410346
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,274 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    --- End diff --
    
    Maybe a duplicate response, but it is only applied in sorting. The overlapping will be checked in prevMax < curMin. 
    
    Of course I could add some comparator that will throw exception on comparing the second element, but I think it will only complicate the readability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r58897094
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,6 +549,60 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 - 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test(expected = IllegalStateException.class)
    +	public void testHashPartitionWithOrders() throws Exception {
    --- End diff --
    
    Please move this test method and the `testRebalanceWithOrders()` method into a new test class `org.apache.flink.api.java.operator.PartitionOperatorTest` in the `flink-java` module. There are more tests for operators like `GroupingTest` that validate correct behavior of the API without actually executing programs. In fact, there should have been already a test class for the partition operator but for some reason these tests are missing.
    
    It would be nice if you could add more tests to the PartitionOperatorTest, including
    - test valid and invalid number of orders for flat and nested keys
    - test valid and invalid key definitions for range and hash partitioning
    - test valid and invalid custom partitioners for range and hash partitioning
    - other corner cases that come to your mind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60075546
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 - 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>() {
    +					@Override
    +					public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception {
    +						return new Tuple2<>(new Tuple2<>(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +				new LongComparator(true));
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0)
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>, Tuple2<Long, Long>>() {
    +					@Override
    +					public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values,
    +											 Collector<Tuple2<Long, Long>> out) throws Exception {
    +						for (Tuple2<Tuple2<Long, Long>, Long> value : values) {
    +							out.collect(value.f0);
    +						}
    +					}
    +				})
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 + 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<ComparablePojo, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() {
    +					@Override
    +					public Tuple2<ComparablePojo, Long> map(Long value) throws Exception {
    +						return new Tuple2<>(new ComparablePojo(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final List<Tuple2<ComparablePojo, ComparablePojo>> collected = dataSet
    +				.partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() {
    +					@Override
    +					public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception {
    +						return value.f0;
    +					}
    +				})
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MinMaxSelector<>(new ComparablePojoComparator()))
    +				.mapPartition(new ExtractComparablePojo())
    +				.collect();
    +
    +		final Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator =
    +				new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() {
    +			@Override
    +			public int compare(Tuple2<ComparablePojo, ComparablePojo> o1,
    +							   Tuple2<ComparablePojo, ComparablePojo> o2) {
    +				return o1.f0.compareTo(o2.f1);
    +			}
    +		};
    +		Collections.sort(collected, pojoComparator);
    +
    +		ComparablePojo previousMax = null;
    +		for (Tuple2<ComparablePojo, ComparablePojo> element : collected) {
    +			if (previousMax == null) {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				previousMax = element.f1;
    +			} else {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				if (previousMax.first.equals(element.f0.first)) {
    +					assertEquals(previousMax.second - 1, element.f0.second.longValue());
    +				}
    +				previousMax = element.f1;
    +			}
    +		}
    +	}
    +
    +	private static class ExtractComparablePojo implements MapPartitionFunction<
    +			Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>,
    +			Tuple2<ComparablePojo, ComparablePojo>> {
    +
    +		@Override
    +		public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> values,
    +								 Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception {
    +			for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> value : values) {
    +				out.collect(new Tuple2<>(value.f0.f0, value.f1.f0));
    +			}
    +		}
    +	}
    +
    +    private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo, Long>>, Serializable {
    +
    +		@Override
    +		public int compare(Tuple2<ComparablePojo, Long> o1,
    +						   Tuple2<ComparablePojo, Long> o2) {
    +			return o1.f0.compareTo(o2.f0);
    +		}
    +	}
    +
    +	private static class ComparablePojo implements Comparable<ComparablePojo> {
    +		private Long first;
    +		private Long second;
    +
    +		public Long getFirst() {
    +			return first;
    +		}
    +
    +		public void setFirst(Long first) {
    +			this.first = first;
    +		}
    +
    +		public Long getSecond() {
    +			return second;
    +		}
    +
    +		public void setSecond(Long second) {
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo(Long first,
    +							  Long second) {
    +			this.first = first;
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo() {
    +		}
    +
    +		@Override
    +		public int compareTo(ComparablePojo o) {
    +			final int firstResult = Long.compare(this.first, o.first);
    +			if (firstResult == 0) {
    +				return (-1) * Long.compare(this.second, o.second);
    +			}
    +
    +			return firstResult;
    +		}
    +	}
    +
     	private static class ObjectSelfKeySelector implements KeySelector<Long, Long> {
     		@Override
     		public Long getKey(Long value) throws Exception {
     			return value;
     		}
     	}
     
    -	private static class MinMaxSelector implements MapPartitionFunction<Long, Tuple2<Long, Long>> {
    +	private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T, T>> {
    +
    +		private final Comparator<T> comparator;
    +
    +		public MinMaxSelector(Comparator<T> comparator) {
    +			this.comparator = comparator;
    +		}
    +
     		@Override
    -		public void mapPartition(Iterable<Long> values, Collector<Tuple2<Long, Long>> out) throws Exception {
    -			long max = Long.MIN_VALUE;
    -			long min = Long.MAX_VALUE;
    -			for (long value : values) {
    -				if (value > max) {
    +		public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>> out) throws Exception {
    +			Iterator<T> itr = values.iterator();
    +			T min = itr.next();
    +			T max = min;
    +			T value;
    +			while (itr.hasNext()) {
    +				value= itr.next();
    +				if (comparator.compare(value, min) < 0) {
    +					min = value;
    +				}
    +				if (comparator.compare(value, max) > 0) {
     					max = value;
     				}
     
    -				if (value < min) {
    -					min = value;
    -				}
     			}
    -			Tuple2<Long, Long> result = new Tuple2<>(min, max);
    +
    +			Tuple2<T, T> result = new Tuple2<>(min, max);
     			out.collect(result);
     		}
     	}
     
    -	private static class Tuple2Comparator implements Comparator<Tuple2<Long, Long>> {
    +	private static class Tuple2Comparator<T> implements Comparator<Tuple2<T, T>>, Serializable {
    +
    +		private final Comparator<T> firstComparator;
    +		private final Comparator<T> secondComparator;
    +
    +		public Tuple2Comparator(Comparator<T> comparator) {
    +			this(comparator, comparator);
    +		}
    +
    +		public Tuple2Comparator(Comparator<T> firstComparator,
    +								Comparator<T> secondComparator) {
    +			this.firstComparator = firstComparator;
    +			this.secondComparator = secondComparator;
    +		}
    +
     		@Override
    -		public int compare(Tuple2<Long, Long> first, Tuple2<Long, Long> second) {
    -			long result = first.f0 - second.f0;
    +		public int compare(Tuple2<T, T> first, Tuple2<T, T> second) {
    +			long result = firstComparator.compare(first.f0, second.f0);
     			if (result > 0) {
     				return 1;
     			} else if (result < 0) {
     				return -1;
     			}
     
    -			result = first.f1 - second.f1;
    +			result = secondComparator.compare(first.f1, second.f1);
    --- End diff --
    
    I think we should only order on the min-element of the records (`first.f0` and `second.f0`). If `first.f0 == `second.f0`, the test must fail, because it would indicate that the same record ended up in two partitions, which is not allowed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60071774
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java ---
    @@ -183,9 +193,13 @@ public String toString() {
     		
     		public static final String SELECT_ALL_CHAR = "*";
     		public static final String SELECT_ALL_CHAR_SCALA = "_";
    +		private static final Pattern WILD_CARD_REGEX = Pattern.compile("[\\.]?("
    +				+ "\\" + SELECT_ALL_CHAR + "\\B|"
    --- End diff --
    
    Can we terminate the patterns with `$` instead of `\b` and `\B`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-211422717
  
    Hi @dawidwys, thanks for the update and the extensive tests you added! 
    I had a few minor comments and I am not sure about the tests that check the correctness of the range partitioning. Maybe I missed something, but it would be nice if you could check the `PartitionITCase` again. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-212608472
  
    Merging this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r59176476
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -98,6 +101,14 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		this.customPartitioner = customPartitioner;
     		this.distribution = distribution;
     	}
    +
    +	public PartitionOperator<T> withOrders(Order... orders) {
    --- End diff --
    
    The output of `ek.getOriginalKeyFieldTypes` should be: 
    `TypeInformation[] {TypeInformation<Pojo1>, TypeInformation<Integer>}`.
    
    So the composite type `Pojo1` should not be flattened into its components, i.e., `(String, String)`. This is important, because a user would only specify two orders (one for each key expression) and the first one needs to applied to all flat keys of `Pojo1`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r58897266
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,6 +549,60 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    --- End diff --
    
    we need at least two more tests here:
    - test for range partitioning with key selector and order
    - test for range partitioning with a nested key


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r58894245
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java ---
    @@ -51,12 +53,19 @@
     	private Partitioner<?> customPartitioner;
     	
     	private DataDistribution distribution;
    +
    +	private Ordering ordering;
     	
     	
     	public PartitionOperatorBase(UnaryOperatorInformation<IN, IN> operatorInfo, PartitionMethod pMethod, int[] keys, String name) {
     		super(new UserCodeObjectWrapper<NoOpFunction>(new NoOpFunction()), operatorInfo, keys, name);
     		this.partitionMethod = pMethod;
     	}
    +
    +	public PartitionOperatorBase(UnaryOperatorInformation<IN, IN> operatorInfo, PartitionMethod pMethod, int[] keys, Order[] orders, String name) {
    --- End diff --
    
    I think this constructor is not used. So it can be removed. 
    Also `orders` is not set in this constructor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60407528
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,274 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +            assertTrue("Min element in each partition should be smaller than max.",
    +                    tuple2Comparator.compare(tuple2.f0, tuple2.f1) <= 0);
    +			if (previousMax == null) {
    +				previousMax = tuple2.f1;
    +			} else {
    +                assertTrue("Partitions overlap. Previous max should be smaller than current min.",
    +                        tuple2Comparator.compare(previousMax, tuple2.f0) <= 0);
    --- End diff --
    
    `previousMax` must be smaller, not smaller or equal, than `tuple2.f0`. Otherwise, the same key would have been sent to different partitions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r59137767
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -98,6 +101,14 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		this.customPartitioner = customPartitioner;
     		this.distribution = distribution;
     	}
    +
    +	public PartitionOperator<T> withOrders(Order... orders) {
    --- End diff --
    
    Hi. I started working on this change, but I don't quite know how should I treat keyExpression (with wildcards especially). 
    
    Lets take some complex example:
    
    ```
    TypeInformation<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ti =
    		new TupleTypeInfo<>(
    			BasicTypeInfo.INT_TYPE_INFO,
    			TypeExtractor.getForClass(Pojo1.class),
    			TypeExtractor.getForClass(PojoWithMultiplePojos.class)
    		);
    
    ek = new ExpressionKeys<>(new String[] {"f2.p1.*", "f0"}, ti);
    
    public static class Pojo1 {
    	public String a;
    	public String b;
    }
    public static class Pojo2 {
    	public String a2;
    	public String b2;
    }
    public static class PojoWithMultiplePojos {
    	public Pojo1 p1;
    	public Pojo2 p2;
    	public Integer i0;
    }
    ```
    
    What should be the output of `ek.getOriginalKeyFieldTypes`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60071991
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -98,6 +102,23 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		this.customPartitioner = customPartitioner;
     		this.distribution = distribution;
     	}
    +
    +	/**
    +	 * Sets the order of keys for range partitioning.
    +	 * NOTE: Only valid for {@link PartitionMethod.RANGE}.
    +	 *
    +	 * @param orders array of orders for each specified partition key
    +	 * @return The partitioneOperator with properly set orders for given keys
    +     */
    +	@PublicEvolving
    +	public PartitionOperator<T> withOrders(Order... orders) {
    +		Preconditions.checkState(pMethod == PartitionMethod.RANGE, "Orders cannot be applied for %s partition " +
    +				"method", pMethod);
    +		Preconditions.checkArgument(pKeys.getOriginalKeyFieldTypes().length == orders.length);
    --- End diff --
    
    Can you add a descriptive error message here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60411268
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,274 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    --- End diff --
    
    You are right. The other check will catch it as well. I agree, let's keep it as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-212430790
  
    It's a bit confusing with all these nested tuples and comparators, but I think I got it now. I left a comment on the comparator that I think should be changed. Please check and let me know what you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r58895362
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -98,6 +101,14 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		this.customPartitioner = customPartitioner;
     		this.distribution = distribution;
     	}
    +
    +	public PartitionOperator<T> withOrders(Order... orders) {
    --- End diff --
    
    We should check that the number of `orders` is the same as the number of specified keys. Unfortunately, this is not as trivial as it sounds, because `Keys` does not give access to the the originally specified keys but only to the flattened logical keys. If a program specifies a `Tuple2<Long, Long>` as key, it will only specify a single order but the logical keys will be flattened to `[Long, Long]`. I think we should extend the `Keys` class by a method `TypeInformation<?>[] getOriginalKeyFieldTypes()` which returns the unflattened field types. Using that method we can see how many flat fields exist for each specified key field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-212581664
  
    I think I applied all changes. 
    
    Unfortunately the travis build fails but I don't know why. The test it fails has nothing in common with my changes. It also passes locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60075060
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    --- End diff --
    
    I am not sure the tests to check the correctness of the range partitioning are correct. The MinMaxSelector returns the smallest and largest records of each partition. We must ensure that the partitions are not overlapping. I propose to
    
    1. check for each minmax record that min <= max (just to be sure)
    2. sort all minmax records in `collected` by the min value (ignore the max value)
    3. check that the min value of each minmax record is larger than the max value of the previous minmax record.
    
    Does that make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60408415
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,274 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    --- End diff --
    
    The outer `Comparator<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>` which wraps `tuple2Comparator` should only check on the first (min) value and fail if the min-values of two elements of `collected` are identical.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1848#issuecomment-206876652
  
    Thanks for working on this issue and opening a PR @dawidwys!
    I'll have a look at it and give feedback soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r58895433
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -98,6 +101,14 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		this.customPartitioner = customPartitioner;
     		this.distribution = distribution;
     	}
    +
    +	public PartitionOperator<T> withOrders(Order... orders) {
    --- End diff --
    
    Please add the `@PublicEvolving` annotation to this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60073978
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java ---
    @@ -90,13 +90,20 @@ public SemanticProperties getSemanticProperties() {
     		private final PartitionMethod pMethod;
     		private final Partitioner<?> customPartitioner;
     		private final DataDistribution distribution;
    -		
    +		private final Ordering ordering;
    +
     		public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner<?> customPartitioner, DataDistribution distribution) {
    --- End diff --
    
    Can you remove this constructor? I think the optimizer will fail if no `ordering` is set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1848


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60391643
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 - 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>() {
    +					@Override
    +					public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception {
    +						return new Tuple2<>(new Tuple2<>(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +				new LongComparator(true));
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0)
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>, Tuple2<Long, Long>>() {
    +					@Override
    +					public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values,
    +											 Collector<Tuple2<Long, Long>> out) throws Exception {
    +						for (Tuple2<Tuple2<Long, Long>, Long> value : values) {
    +							out.collect(value.f0);
    +						}
    +					}
    +				})
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 + 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<ComparablePojo, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() {
    +					@Override
    +					public Tuple2<ComparablePojo, Long> map(Long value) throws Exception {
    +						return new Tuple2<>(new ComparablePojo(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final List<Tuple2<ComparablePojo, ComparablePojo>> collected = dataSet
    +				.partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() {
    +					@Override
    +					public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception {
    +						return value.f0;
    +					}
    +				})
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MinMaxSelector<>(new ComparablePojoComparator()))
    +				.mapPartition(new ExtractComparablePojo())
    +				.collect();
    +
    +		final Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator =
    +				new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() {
    +			@Override
    +			public int compare(Tuple2<ComparablePojo, ComparablePojo> o1,
    +							   Tuple2<ComparablePojo, ComparablePojo> o2) {
    +				return o1.f0.compareTo(o2.f1);
    +			}
    +		};
    +		Collections.sort(collected, pojoComparator);
    +
    +		ComparablePojo previousMax = null;
    +		for (Tuple2<ComparablePojo, ComparablePojo> element : collected) {
    +			if (previousMax == null) {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				previousMax = element.f1;
    +			} else {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				if (previousMax.first.equals(element.f0.first)) {
    +					assertEquals(previousMax.second - 1, element.f0.second.longValue());
    +				}
    +				previousMax = element.f1;
    +			}
    +		}
    +	}
    +
    +	private static class ExtractComparablePojo implements MapPartitionFunction<
    +			Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>,
    +			Tuple2<ComparablePojo, ComparablePojo>> {
    +
    +		@Override
    +		public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> values,
    +								 Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception {
    +			for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> value : values) {
    +				out.collect(new Tuple2<>(value.f0.f0, value.f1.f0));
    +			}
    +		}
    +	}
    +
    +    private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo, Long>>, Serializable {
    +
    +		@Override
    +		public int compare(Tuple2<ComparablePojo, Long> o1,
    +						   Tuple2<ComparablePojo, Long> o2) {
    +			return o1.f0.compareTo(o2.f0);
    +		}
    +	}
    +
    +	private static class ComparablePojo implements Comparable<ComparablePojo> {
    +		private Long first;
    +		private Long second;
    +
    +		public Long getFirst() {
    +			return first;
    +		}
    +
    +		public void setFirst(Long first) {
    +			this.first = first;
    +		}
    +
    +		public Long getSecond() {
    +			return second;
    +		}
    +
    +		public void setSecond(Long second) {
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo(Long first,
    +							  Long second) {
    +			this.first = first;
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo() {
    +		}
    +
    +		@Override
    +		public int compareTo(ComparablePojo o) {
    +			final int firstResult = Long.compare(this.first, o.first);
    +			if (firstResult == 0) {
    +				return (-1) * Long.compare(this.second, o.second);
    +			}
    +
    +			return firstResult;
    +		}
    +	}
    +
     	private static class ObjectSelfKeySelector implements KeySelector<Long, Long> {
     		@Override
     		public Long getKey(Long value) throws Exception {
     			return value;
     		}
     	}
     
    -	private static class MinMaxSelector implements MapPartitionFunction<Long, Tuple2<Long, Long>> {
    +	private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T, T>> {
    +
    +		private final Comparator<T> comparator;
    +
    +		public MinMaxSelector(Comparator<T> comparator) {
    +			this.comparator = comparator;
    +		}
    +
     		@Override
    -		public void mapPartition(Iterable<Long> values, Collector<Tuple2<Long, Long>> out) throws Exception {
    -			long max = Long.MIN_VALUE;
    -			long min = Long.MAX_VALUE;
    -			for (long value : values) {
    -				if (value > max) {
    +		public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>> out) throws Exception {
    +			Iterator<T> itr = values.iterator();
    +			T min = itr.next();
    +			T max = min;
    +			T value;
    +			while (itr.hasNext()) {
    +				value= itr.next();
    +				if (comparator.compare(value, min) < 0) {
    +					min = value;
    +				}
    +				if (comparator.compare(value, max) > 0) {
     					max = value;
     				}
     
    -				if (value < min) {
    -					min = value;
    -				}
     			}
    -			Tuple2<Long, Long> result = new Tuple2<>(min, max);
    +
    +			Tuple2<T, T> result = new Tuple2<>(min, max);
     			out.collect(result);
     		}
     	}
     
    -	private static class Tuple2Comparator implements Comparator<Tuple2<Long, Long>> {
    +	private static class Tuple2Comparator<T> implements Comparator<Tuple2<T, T>>, Serializable {
    +
    +		private final Comparator<T> firstComparator;
    +		private final Comparator<T> secondComparator;
    +
    +		public Tuple2Comparator(Comparator<T> comparator) {
    +			this(comparator, comparator);
    +		}
    +
    +		public Tuple2Comparator(Comparator<T> firstComparator,
    +								Comparator<T> secondComparator) {
    +			this.firstComparator = firstComparator;
    +			this.secondComparator = secondComparator;
    +		}
    +
     		@Override
    -		public int compare(Tuple2<Long, Long> first, Tuple2<Long, Long> second) {
    -			long result = first.f0 - second.f0;
    +		public int compare(Tuple2<T, T> first, Tuple2<T, T> second) {
    +			long result = firstComparator.compare(first.f0, second.f0);
     			if (result > 0) {
     				return 1;
     			} else if (result < 0) {
     				return -1;
     			}
     
    -			result = first.f1 - second.f1;
    +			result = secondComparator.compare(first.f1, second.f1);
    --- End diff --
    
    This comparator is not used for comparing whole partitions of format ((min,max), (min, max)). It is rather used for comparing min with max when they're tuples. So I think the comparison on the second key is vital.
    
    Imagine for example partitioning on two fields a cartesian product of (1 to 2) x (1 to 50) where resutling partitions could be for example:
    
    1. min: (1,50)  max: (1, 17)
    2. min (1,16)   max: (2, 33) 
    3. min (2,32)   max: (2, 1)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60392270
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    --- End diff --
    
    In fact I tried to do similar as what you described with some differences.
    Ad. 1  I performed this step in two different places - I simplified it a bit and added a comment.
    Ad.2  I sort on both fields but don't think it is a problem, as the min value is of greater importance.
    Ad.3 In fact I forgot about this step which I added.
    4. I also check for cases like described in my response to your comment on line 809 regarding the Tuple2Comparator. Ensuring that partitioning on the second part of composite key is done properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60407974
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    --- End diff --
    
    Yes, looks correct (except for the `<=` instead of `<`). Thanks for clarifying!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3665] Implemented sort orders support i...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60409121
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---
    @@ -546,43 +549,274 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
    +            assertTrue("Min element in each partition should be smaller than max.",
    +                    tuple2Comparator.compare(tuple2.f0, tuple2.f1) <= 0);
    +			if (previousMax == null) {
    +				previousMax = tuple2.f1;
    +			} else {
    +                assertTrue("Partitions overlap. Previous max should be smaller than current min.",
    +                        tuple2Comparator.compare(previousMax, tuple2.f0) <= 0);
    --- End diff --
    
    Right, will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---