You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Dawid Wysakowicz <wy...@gmail.com> on 2016/03/29 20:47:28 UTC

Range partitioning

Hi all,

recently I am working on FLINK-2946
<https://issues.apache.org/jira/browse/FLINK-2946> and I am supposed to use
range partitioning, but I am not sure about the behaviour. I've adjusted a
little bit PartitionITCase#testRangePartitionerOnSequenceData so to set
custom parallelism after partitioning and it results in failing the test.
Is that a right behaviour or is that a bug?
Will be grateful for any comments.

The adjusted code:

@Test
public void testRangePartitionerOnSequenceData() throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSource<Long> dataSource = env.generateSequence(0, 10000);
KeySelector<Long, Long> keyExtractor = new ObjectSelfKeySelector();

MapPartitionFunction<Long, Tuple2<Long, Long>> MinMaxSelector = new
MinMaxSelector();

Comparator<Tuple2<Long, Long>> tuple2Comparator = new Tuple2Comparator();

final MapPartitionOperator<Long, Tuple2<Long, Long>> dataSourceToCollect =
dataSource
.partitionByRange(keyExtractor).mapPartition(MinMaxSelector).setParallelism(3);

List<Tuple2<Long, Long>> collected = dataSourceToCollect.collect();
Collections.sort(collected, tuple2Comparator);

long previousMax = -1;
for (Tuple2<Long, Long> tuple2 : collected) {
if (previousMax == -1) {
previousMax = tuple2.f1;
} else {
long currentMin = tuple2.f0;
assertTrue(tuple2.f0 < tuple2.f1);
assertEquals(previousMax + 1, currentMin);
previousMax = tuple2.f1;
}
}
}

Re: Range partitioning

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Dawid,

this is expected behavior. A partitioning will only be valid to the point
that you change the parallelism.

In the modified program the data will be correctly partitioned (lets say
into 8 partitions if the default parallelism is 8).
After the partitioning, the 8 partitions have to be reduced to 3 partitions
as defined by the map-partition operator with parallelism 3. This is done
by randomly shuffling which destroys the range-partitioning.

You have to set the parallelism of the partition operator to 3 as well to
preserve the partitioning in the map-partition operator with parallelism 3.

Cheers, Fabian

2016-03-29 20:47 GMT+02:00 Dawid Wysakowicz <wy...@gmail.com>:

> Hi all,
>
> recently I am working on FLINK-2946
> <https://issues.apache.org/jira/browse/FLINK-2946> and I am supposed to
> use
> range partitioning, but I am not sure about the behaviour. I've adjusted a
> little bit PartitionITCase#testRangePartitionerOnSequenceData so to set
> custom parallelism after partitioning and it results in failing the test.
> Is that a right behaviour or is that a bug?
> Will be grateful for any comments.
>
> The adjusted code:
>
> @Test
> public void testRangePartitionerOnSequenceData() throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSource<Long> dataSource = env.generateSequence(0, 10000);
> KeySelector<Long, Long> keyExtractor = new ObjectSelfKeySelector();
>
> MapPartitionFunction<Long, Tuple2<Long, Long>> MinMaxSelector = new
> MinMaxSelector();
>
> Comparator<Tuple2<Long, Long>> tuple2Comparator = new Tuple2Comparator();
>
> final MapPartitionOperator<Long, Tuple2<Long, Long>> dataSourceToCollect =
> dataSource
>
> .partitionByRange(keyExtractor).mapPartition(MinMaxSelector).setParallelism(3);
>
> List<Tuple2<Long, Long>> collected = dataSourceToCollect.collect();
> Collections.sort(collected, tuple2Comparator);
>
> long previousMax = -1;
> for (Tuple2<Long, Long> tuple2 : collected) {
> if (previousMax == -1) {
> previousMax = tuple2.f1;
> } else {
> long currentMin = tuple2.f0;
> assertTrue(tuple2.f0 < tuple2.f1);
> assertEquals(previousMax + 1, currentMin);
> previousMax = tuple2.f1;
> }
> }
> }
>