You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yukun Guo <gy...@gmail.com> on 2016/06/09 11:19:08 UTC

Strange behavior of DataStream.countWindow

I’m playing with the (Window)WordCount example from Flink QuickStart. I
generate a DataStream consisting of 1000 Strings of random digits, which is
windowed with a tumbling count window of 50 elements:

import org.apache.flink.api.common.functions.FlatMapFunction;import
org.apache.flink.api.java.functions.KeySelector;import
org.apache.flink.api.java.tuple.Tuple2;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.util.Collector;
import java.util.Random;
public class DigitCount {


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.fromElements(
                "14159265358979323846264338327950288419716939937510",
                "58209749445923078164062862089986280348253421170679",
                "82148086513282306647093844609550582231725359408128",
                "48111745028410270193852110555964462294895493038196",
                "44288109756659334461284756482337867831652712019091",
                "45648566923460348610454326648213393607260249141273",
                "72458700660631558817488152092096282925409171536436",
                "78925903600113305305488204665213841469519415116094",
                "33057270365759591953092186117381932611793105118548",
                "07446237996274956735188575272489122793818301194912",
                "98336733624406566430860213949463952247371907021798",
                "60943702770539217176293176752384674818467669405132",
                "00056812714526356082778577134275778960917363717872",
                "14684409012249534301465495853710507922796892589235",
                "42019956112129021960864034418159813629774771309960",
                "51870721134999999837297804995105973173281609631859",
                "50244594553469083026425223082533446850352619311881",
                "71010003137838752886587533208381420617177669147303",
                "59825349042875546873115956286388235378759375195778",
                "18577805321712268066130019278766111959092164201989"
        );

        DataStream<Tuple2<Integer, Integer>> digitCount = text
                .flatMap(new Splitter())
                .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, Integer> x)
throws Exception {
                        return x.f0 % 2;
                    }
                })
                .countWindow(50)
                .sum(1);

        digitCount.print();
        env.execute();

    }

    public static final class Splitter implements
FlatMapFunction<String, Tuple2<Integer, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<Integer,
Integer>> out) {
            for (String token : value.split("")) {
                if (token.length() == 0) {
                    continue;
                }
                out.collect(Tuple2.of(Integer.parseInt(token), 1));
            }
        }
    }
}

The code above will produce 19 lines of output which is reasonable as the
1000 digits will be keyed into 2 partitions where one partition contains
500+ elements and the other contains slightly fewer than 500 elements,
therefore as a result one 50-digit window is ignored.

So far so good, but if I replace the mod KeySelector with a random one:

private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
    private int nPartitions;
    private Random random;

    RandomKeySelector(int nPartitions) {
        this.nPartitions = nPartitions;
        random = new Random();
    }

    @Override
    public Integer getKey(T dummy) throws Exception {
        return random.nextInt(this.nPartitions);
    }
}

and then

.keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))

it may generate 17 or 18 lines of output. How could that happen? Moreover,
if I set the number of partitions to 10, in theory the lines of output
should be no fewer than 11, but actually it can be only 9.

Please help me understand why countWindow behaves like this.

Re: Strange behavior of DataStream.countWindow

Posted by Fabian Hueske <fh...@gmail.com>.
If the data does not have a key (or you do not care about it) you can also
use a FlatMapFunction (or ProcessFunction) with Operator State. Operator
State is not bound to a key but to a parallel operator instance. Have a
look at the ListCheckpointed interface and its JavaDocs.

2017-06-23 18:27 GMT+02:00 Edward <eg...@hotmail.com>:

> So there is no way to do a countWindow(100) and preserve data locality?
>
> My use case is this: augment a data stream with new fields from DynamoDB
> lookup. DynamoDB allows batch get's of up to 100 records, so I am trying to
> collect 100 records before making that call. I have no other reason to do a
> repartitioning, so I am hoping to avoid incurring the cost of shipping all
> the data across the network to do this.
>
> If I use countWindowAll, I am limited to parallelism = 1, so all data gets
> repartitioned twice. And if I use keyBy().countWindow(), then it gets
> repartitioned by key. So in both cases I lose locality.
>
> Am I missing any other options?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-
> countWindow-tp7482p13981.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Strange behavior of DataStream.countWindow

Posted by Edward <eg...@hotmail.com>.
So there is no way to do a countWindow(100) and preserve data locality?

My use case is this: augment a data stream with new fields from DynamoDB
lookup. DynamoDB allows batch get's of up to 100 records, so I am trying to
collect 100 records before making that call. I have no other reason to do a
repartitioning, so I am hoping to avoid incurring the cost of shipping all
the data across the network to do this. 

If I use countWindowAll, I am limited to parallelism = 1, so all data gets
repartitioned twice. And if I use keyBy().countWindow(), then it gets
repartitioned by key. So in both cases I lose locality.

Am I missing any other options?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13981.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Strange behavior of DataStream.countWindow

Posted by Fabian Hueske <fh...@gmail.com>.
No, you will lose data locality if you use keyBy(), which is the only way
to obtain a KeyedStream.

2017-06-23 17:52 GMT+02:00 Edward <eg...@hotmail.com>:

> Thanks, Fabian.
> In this case, I could just extend your idea by creating some deterministic
> multiplier of the subtask index:
>
>       RichMapFunction<String, Tuple2<Integer,String>> keyByMap = new
> RichMapFunction<String, Tuple2<Integer,String>>() {
>               public Tuple2<Integer,String> map(String value) {
>                 int indexOfCounter = Math.abs(value.hashCode()) % 4;
>                 int key = (( getRuntimeContext().getIndexOfThisSubtask() +
> 1)  * (indexOfCounter + 1)) - 1;
>                 counters.get(key).add(1);
>                 return new Tuple2<>(key, value);
>             }
>         };
>
> With this idea, if there are 12 subtasks, then subtask 0 would create 4
> keys: 0, 12, 24, and 36.
>
> The big advantage of your idea was that it would keep the data local. Is
> this still true with my example here (where I'm applying a function to the
> subtask index)? That is, if a each partition is generating a unique set of
> keys (unique to that subtask), will it optimize to keep that set of keys
> local for the next downstream subtask?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-
> countWindow-tp7482p13978.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Strange behavior of DataStream.countWindow

Posted by Edward <eg...@hotmail.com>.
Thanks, Fabian.
In this case, I could just extend your idea by creating some deterministic
multiplier of the subtask index:

      RichMapFunction<String, Tuple2&lt;Integer,String>> keyByMap = new
RichMapFunction<String, Tuple2&lt;Integer,String>>() {
              public Tuple2<Integer,String> map(String value) {
                int indexOfCounter = Math.abs(value.hashCode()) % 4;
                int key = (( getRuntimeContext().getIndexOfThisSubtask() +
1)  * (indexOfCounter + 1)) - 1;
                counters.get(key).add(1);
                return new Tuple2<>(key, value);
            }
        };

With this idea, if there are 12 subtasks, then subtask 0 would create 4
keys: 0, 12, 24, and 36.

The big advantage of your idea was that it would keep the data local. Is
this still true with my example here (where I'm applying a function to the
subtask index)? That is, if a each partition is generating a unique set of
keys (unique to that subtask), will it optimize to keep that set of keys
local for the next downstream subtask?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13978.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Strange behavior of DataStream.countWindow

Posted by Fabian Hueske <fh...@gmail.com>.
Flink hashes the keys and computes the target partition using modulo. This
works well, if you have many keys but leads to skew if the number of keys
is close to the number of partitions.
If you use parittionCustom, you can explicitly define the target partition,
however, partitionCustom does not return a KeyedStream, so you cannot use
keyed state or windows there.

Not sure if that works for your usecase, but you could try to use more keys
to achieve a more uniform key distribution.

Best, Fabian

2017-06-23 15:34 GMT+02:00 Edward <eg...@hotmail.com>:

> Hi Fabian -
> I've tried this idea of creating a KeyedStream based on
> getRuntimeContext().getIndexOfThisSubtask(). However, not all target
> subtasks are receiving records.
>
> All subtasks have a parallelism of 12, so I have 12 source subtasks and 12
> target subtasks. I've confirmed that the call to getIndexOfThisSubtask is
> evenly distributed between 0 and 11. However, 4 out of the 12 target
> subtasks (the subtasks after the hash) are no receiving any data. This
> means
> it's not actually keeping all the data local, because at least 4 of the 12
> partitions could be getting sent to different TaskManagers.
>
> Do I need to do a .partitionCustom to ensure even/local distribution?
>
> Thanks,
> Edward
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-
> countWindow-tp7482p13971.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Strange behavior of DataStream.countWindow

Posted by Edward <eg...@hotmail.com>.
Hi Fabian - 
I've tried this idea of creating a KeyedStream based on
getRuntimeContext().getIndexOfThisSubtask(). However, not all target
subtasks are receiving records.

All subtasks have a parallelism of 12, so I have 12 source subtasks and 12
target subtasks. I've confirmed that the call to getIndexOfThisSubtask is
evenly distributed between 0 and 11. However, 4 out of the 12 target
subtasks (the subtasks after the hash) are no receiving any data. This means
it's not actually keeping all the data local, because at least 4 of the 12
partitions could be getting sent to different TaskManagers.

Do I need to do a .partitionCustom to ensure even/local distribution?

Thanks,
Edward



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13971.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Strange behavior of DataStream.countWindow

Posted by Fabian Hueske <fh...@gmail.com>.
If I understood you correctly, you want to compute windows in parallel
without using a key.
Are you aware that the results of such a computation is not deterministic
and kind of arbitrary?

If that is still OK for you, you can use a mapper to assign the current
parallel index as a key field, i.e., wrap the data in a Tuple2<Key,
PayLoad> and then do a keyBy(0). This will keep the data local. The mapper
should extend RichMapFunction. You can access the parallel index via
getRuntimeContext().getParallelSubTaskId().

Hope this helps.
Cheers, Fabian

2016-06-11 11:53 GMT+02:00 Yukun Guo <gy...@gmail.com>:

> Thx, now I use element.hashCode() % nPartitions and it works as expected.
>
> But I'm afraid it's not a best practice for just turning a plain (already
> paralellized) DataStream into a KeyedStream? Because it introduces some
> overhead due to physical repartitioning by key, which is unnecessary since
> I don't really care about keys.
>
> On 9 June 2016 at 22:00, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Yukun,
>>
>> the problem is that the KeySelector is internally invoked multiple times.
>> Hence it must be deterministic, i.e., it must extract the same key for
>> the same object if invoked multiple times.
>> The documentation is not discussing this aspect and should be extended.
>>
>> Thanks for pointing out this issue.
>>
>> Cheers,
>> Fabian
>>
>>
>> 2016-06-09 13:19 GMT+02:00 Yukun Guo <gy...@gmail.com>:
>>
>>> I’m playing with the (Window)WordCount example from Flink QuickStart. I
>>> generate a DataStream consisting of 1000 Strings of random digits,
>>> which is windowed with a tumbling count window of 50 elements:
>>>
>>> import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;
>>> import java.util.Random;
>>> public class DigitCount {
>>>
>>>
>>>     public static void main(String[] args) throws Exception {
>>>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         DataStream<String> text = env.fromElements(
>>>                 "14159265358979323846264338327950288419716939937510",
>>>                 "58209749445923078164062862089986280348253421170679",
>>>                 "82148086513282306647093844609550582231725359408128",
>>>                 "48111745028410270193852110555964462294895493038196",
>>>                 "44288109756659334461284756482337867831652712019091",
>>>                 "45648566923460348610454326648213393607260249141273",
>>>                 "72458700660631558817488152092096282925409171536436",
>>>                 "78925903600113305305488204665213841469519415116094",
>>>                 "33057270365759591953092186117381932611793105118548",
>>>                 "07446237996274956735188575272489122793818301194912",
>>>                 "98336733624406566430860213949463952247371907021798",
>>>                 "60943702770539217176293176752384674818467669405132",
>>>                 "00056812714526356082778577134275778960917363717872",
>>>                 "14684409012249534301465495853710507922796892589235",
>>>                 "42019956112129021960864034418159813629774771309960",
>>>                 "51870721134999999837297804995105973173281609631859",
>>>                 "50244594553469083026425223082533446850352619311881",
>>>                 "71010003137838752886587533208381420617177669147303",
>>>                 "59825349042875546873115956286388235378759375195778",
>>>                 "18577805321712268066130019278766111959092164201989"
>>>         );
>>>
>>>         DataStream<Tuple2<Integer, Integer>> digitCount = text
>>>                 .flatMap(new Splitter())
>>>                 .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
>>>                     @Override
>>>                     public Integer getKey(Tuple2<Integer, Integer> x) throws Exception {
>>>                         return x.f0 % 2;
>>>                     }
>>>                 })
>>>                 .countWindow(50)
>>>                 .sum(1);
>>>
>>>         digitCount.print();
>>>         env.execute();
>>>
>>>     }
>>>
>>>     public static final class Splitter implements FlatMapFunction<String, Tuple2<Integer, Integer>> {
>>>         @Override
>>>         public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) {
>>>             for (String token : value.split("")) {
>>>                 if (token.length() == 0) {
>>>                     continue;
>>>                 }
>>>                 out.collect(Tuple2.of(Integer.parseInt(token), 1));
>>>             }
>>>         }
>>>     }
>>> }
>>>
>>> The code above will produce 19 lines of output which is reasonable as
>>> the 1000 digits will be keyed into 2 partitions where one partition
>>> contains 500+ elements and the other contains slightly fewer than 500
>>> elements, therefore as a result one 50-digit window is ignored.
>>>
>>> So far so good, but if I replace the mod KeySelector with a random one:
>>>
>>> private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
>>>     private int nPartitions;
>>>     private Random random;
>>>
>>>     RandomKeySelector(int nPartitions) {
>>>         this.nPartitions = nPartitions;
>>>         random = new Random();
>>>     }
>>>
>>>     @Override
>>>     public Integer getKey(T dummy) throws Exception {
>>>         return random.nextInt(this.nPartitions);
>>>     }
>>> }
>>>
>>> and then
>>>
>>> .keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))
>>>
>>> it may generate 17 or 18 lines of output. How could that happen?
>>> Moreover, if I set the number of partitions to 10, in theory the lines of
>>> output should be no fewer than 11, but actually it can be only 9.
>>>
>>> Please help me understand why countWindow behaves like this.
>>>
>>
>>
>

Re: Strange behavior of DataStream.countWindow

Posted by Yukun Guo <gy...@gmail.com>.
Thx, now I use element.hashCode() % nPartitions and it works as expected.

But I'm afraid it's not a best practice for just turning a plain (already
paralellized) DataStream into a KeyedStream? Because it introduces some
overhead due to physical repartitioning by key, which is unnecessary since
I don't really care about keys.

On 9 June 2016 at 22:00, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Yukun,
>
> the problem is that the KeySelector is internally invoked multiple times.
> Hence it must be deterministic, i.e., it must extract the same key for the
> same object if invoked multiple times.
> The documentation is not discussing this aspect and should be extended.
>
> Thanks for pointing out this issue.
>
> Cheers,
> Fabian
>
>
> 2016-06-09 13:19 GMT+02:00 Yukun Guo <gy...@gmail.com>:
>
>> I’m playing with the (Window)WordCount example from Flink QuickStart. I
>> generate a DataStream consisting of 1000 Strings of random digits, which
>> is windowed with a tumbling count window of 50 elements:
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;
>> import java.util.Random;
>> public class DigitCount {
>>
>>
>>     public static void main(String[] args) throws Exception {
>>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         DataStream<String> text = env.fromElements(
>>                 "14159265358979323846264338327950288419716939937510",
>>                 "58209749445923078164062862089986280348253421170679",
>>                 "82148086513282306647093844609550582231725359408128",
>>                 "48111745028410270193852110555964462294895493038196",
>>                 "44288109756659334461284756482337867831652712019091",
>>                 "45648566923460348610454326648213393607260249141273",
>>                 "72458700660631558817488152092096282925409171536436",
>>                 "78925903600113305305488204665213841469519415116094",
>>                 "33057270365759591953092186117381932611793105118548",
>>                 "07446237996274956735188575272489122793818301194912",
>>                 "98336733624406566430860213949463952247371907021798",
>>                 "60943702770539217176293176752384674818467669405132",
>>                 "00056812714526356082778577134275778960917363717872",
>>                 "14684409012249534301465495853710507922796892589235",
>>                 "42019956112129021960864034418159813629774771309960",
>>                 "51870721134999999837297804995105973173281609631859",
>>                 "50244594553469083026425223082533446850352619311881",
>>                 "71010003137838752886587533208381420617177669147303",
>>                 "59825349042875546873115956286388235378759375195778",
>>                 "18577805321712268066130019278766111959092164201989"
>>         );
>>
>>         DataStream<Tuple2<Integer, Integer>> digitCount = text
>>                 .flatMap(new Splitter())
>>                 .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
>>                     @Override
>>                     public Integer getKey(Tuple2<Integer, Integer> x) throws Exception {
>>                         return x.f0 % 2;
>>                     }
>>                 })
>>                 .countWindow(50)
>>                 .sum(1);
>>
>>         digitCount.print();
>>         env.execute();
>>
>>     }
>>
>>     public static final class Splitter implements FlatMapFunction<String, Tuple2<Integer, Integer>> {
>>         @Override
>>         public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) {
>>             for (String token : value.split("")) {
>>                 if (token.length() == 0) {
>>                     continue;
>>                 }
>>                 out.collect(Tuple2.of(Integer.parseInt(token), 1));
>>             }
>>         }
>>     }
>> }
>>
>> The code above will produce 19 lines of output which is reasonable as the
>> 1000 digits will be keyed into 2 partitions where one partition contains
>> 500+ elements and the other contains slightly fewer than 500 elements,
>> therefore as a result one 50-digit window is ignored.
>>
>> So far so good, but if I replace the mod KeySelector with a random one:
>>
>> private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
>>     private int nPartitions;
>>     private Random random;
>>
>>     RandomKeySelector(int nPartitions) {
>>         this.nPartitions = nPartitions;
>>         random = new Random();
>>     }
>>
>>     @Override
>>     public Integer getKey(T dummy) throws Exception {
>>         return random.nextInt(this.nPartitions);
>>     }
>> }
>>
>> and then
>>
>> .keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))
>>
>> it may generate 17 or 18 lines of output. How could that happen?
>> Moreover, if I set the number of partitions to 10, in theory the lines of
>> output should be no fewer than 11, but actually it can be only 9.
>>
>> Please help me understand why countWindow behaves like this.
>>
>
>

Re: Strange behavior of DataStream.countWindow

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Yukun,

the problem is that the KeySelector is internally invoked multiple times.
Hence it must be deterministic, i.e., it must extract the same key for the
same object if invoked multiple times.
The documentation is not discussing this aspect and should be extended.

Thanks for pointing out this issue.

Cheers,
Fabian


2016-06-09 13:19 GMT+02:00 Yukun Guo <gy...@gmail.com>:

> I’m playing with the (Window)WordCount example from Flink QuickStart. I
> generate a DataStream consisting of 1000 Strings of random digits, which
> is windowed with a tumbling count window of 50 elements:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;
> import java.util.Random;
> public class DigitCount {
>
>
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>         DataStream<String> text = env.fromElements(
>                 "14159265358979323846264338327950288419716939937510",
>                 "58209749445923078164062862089986280348253421170679",
>                 "82148086513282306647093844609550582231725359408128",
>                 "48111745028410270193852110555964462294895493038196",
>                 "44288109756659334461284756482337867831652712019091",
>                 "45648566923460348610454326648213393607260249141273",
>                 "72458700660631558817488152092096282925409171536436",
>                 "78925903600113305305488204665213841469519415116094",
>                 "33057270365759591953092186117381932611793105118548",
>                 "07446237996274956735188575272489122793818301194912",
>                 "98336733624406566430860213949463952247371907021798",
>                 "60943702770539217176293176752384674818467669405132",
>                 "00056812714526356082778577134275778960917363717872",
>                 "14684409012249534301465495853710507922796892589235",
>                 "42019956112129021960864034418159813629774771309960",
>                 "51870721134999999837297804995105973173281609631859",
>                 "50244594553469083026425223082533446850352619311881",
>                 "71010003137838752886587533208381420617177669147303",
>                 "59825349042875546873115956286388235378759375195778",
>                 "18577805321712268066130019278766111959092164201989"
>         );
>
>         DataStream<Tuple2<Integer, Integer>> digitCount = text
>                 .flatMap(new Splitter())
>                 .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
>                     @Override
>                     public Integer getKey(Tuple2<Integer, Integer> x) throws Exception {
>                         return x.f0 % 2;
>                     }
>                 })
>                 .countWindow(50)
>                 .sum(1);
>
>         digitCount.print();
>         env.execute();
>
>     }
>
>     public static final class Splitter implements FlatMapFunction<String, Tuple2<Integer, Integer>> {
>         @Override
>         public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) {
>             for (String token : value.split("")) {
>                 if (token.length() == 0) {
>                     continue;
>                 }
>                 out.collect(Tuple2.of(Integer.parseInt(token), 1));
>             }
>         }
>     }
> }
>
> The code above will produce 19 lines of output which is reasonable as the
> 1000 digits will be keyed into 2 partitions where one partition contains
> 500+ elements and the other contains slightly fewer than 500 elements,
> therefore as a result one 50-digit window is ignored.
>
> So far so good, but if I replace the mod KeySelector with a random one:
>
> private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
>     private int nPartitions;
>     private Random random;
>
>     RandomKeySelector(int nPartitions) {
>         this.nPartitions = nPartitions;
>         random = new Random();
>     }
>
>     @Override
>     public Integer getKey(T dummy) throws Exception {
>         return random.nextInt(this.nPartitions);
>     }
> }
>
> and then
>
> .keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))
>
> it may generate 17 or 18 lines of output. How could that happen? Moreover,
> if I set the number of partitions to 10, in theory the lines of output
> should be no fewer than 11, but actually it can be only 9.
>
> Please help me understand why countWindow behaves like this.
>