You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yassine Marzougui <ya...@gmail.com> on 2016/08/15 10:38:15 UTC

Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

Hi all,

I have a Kafka topic with two partitions, messages within each partition
are ordered in ascending timestamps.

The following code works correctly (I'm running this on my local machine,
the default parallelism is the number of cores=8):

stream = env.addSource(myFlinkKafkaConsumer09)
stream.map(mapper)
  .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
  .keyby(0)
  .timeWindow(Time.minutes(10))
  .reduce(reducer)
  .print()

But if I explicitly set
env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp
monotony violated" warnings. My understanding is that only 2 sources will
be mapped to the topic partitions and since messages are ordered within
each partition, timestamps assignment should happen correctly regardless of
the parallelsim as long as it is >= 2.
*Question 1 *: What is the explanation of this?


Now I add an other empty partition to the topic. The job doesn't give any
output anymore and that's expected since it keeps waiting forever for the
empty partition's watermark. What I don't understand though, is a
strange behavior when set the parallelism explicitly at the source :
*Question 2 *: Why am I able to get an output if I explicitly set
env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
shouldn't the empty partition argument apply here too? And why is that
output seen only when n != 8 ?

Best,
Yassine

Re: Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

Posted by Robert Metzger <rm...@apache.org>.
Hi Yassine,

In Flink 1.2 we've added a new feature to the Kafka consumer, allowing you
to extract timestamps and emitting watermarks per partition.
The consumers now have the following method:

public FlinkKafkaConsumerBase<T>
assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T>
assigner)

Using a timestamp extractor directly attached to the consumer, you don't
need to worry about the parallelism of subsequent operators.


On Mon, Aug 15, 2016 at 4:56 PM, Yassine Marzougui <ya...@gmail.com>
wrote:

> I think I also figured out the reason of the behavior I described when one
> Kafka partition is empty.
> According to the JavaDocs
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#DataStream-org.apache.flink.streaming.api.environment.StreamExecutionEnvironment-org.apache.flink.streaming.api.transformations.StreamTransformation->,
> the datastream partitioning is set to *forward* by default, i.e. each map
> sub-task will receive data from exactly one source sub-task. For one of the
> stream partitions (corresponding to the empty Kafka partition) resulting
> from the map operator, the watermark does not advance, which makes the
> window operator wait forever.
> Now if the map and source operators have a different parallelism, Flink
> uses rebalance partitioning to redistribute the stream as pointed out in this
> mailing list thread
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373p2382.html>,
> therefore the watermark advances for all the stream partitions output from
> the map operator.
> Some of the details regarding the partitioning were mentioned in the 0.9
> docs
> <https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html#partitioning>,
> but unfortunately they aren't in the  1.x docs
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#physical-partitioning>
> .
>
> On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <ya...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I have a Kafka topic with two partitions, messages within each partition
>> are ordered in ascending timestamps.
>>
>> The following code works correctly (I'm running this on my local machine,
>> the default parallelism is the number of cores=8):
>>
>> stream = env.addSource(myFlinkKafkaConsumer09)
>> stream.map(mapper)
>>   .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
>>   .keyby(0)
>>   .timeWindow(Time.minutes(10))
>>   .reduce(reducer)
>>   .print()
>>
>> But if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
>> where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp
>> monotony violated" warnings. My understanding is that only 2 sources will
>> be mapped to the topic partitions and since messages are ordered within
>> each partition, timestamps assignment should happen correctly regardless of
>> the parallelsim as long as it is >= 2.
>> *Question 1 *: What is the explanation of this?
>>
>>
>> Now I add an other empty partition to the topic. The job doesn't give any
>> output anymore and that's expected since it keeps waiting forever for the
>> empty partition's watermark. What I don't understand though, is a
>> strange behavior when set the parallelism explicitly at the source :
>> *Question 2 *: Why am I able to get an output if I explicitly set
>> env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the
>> empty partition argument apply here too? And why is that output seen only
>> when n != 8 ?
>>
>> Best,
>> Yassine
>>
>
>

Re: Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

Posted by Yassine Marzougui <ya...@gmail.com>.
I think I also figured out the reason of the behavior I described when one
Kafka partition is empty.
According to the JavaDocs
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#DataStream-org.apache.flink.streaming.api.environment.StreamExecutionEnvironment-org.apache.flink.streaming.api.transformations.StreamTransformation->,
the datastream partitioning is set to *forward* by default, i.e. each map
sub-task will receive data from exactly one source sub-task. For one of the
stream partitions (corresponding to the empty Kafka partition) resulting
from the map operator, the watermark does not advance, which makes the
window operator wait forever.
Now if the map and source operators have a different parallelism, Flink
uses rebalance partitioning to redistribute the stream as pointed out in this
mailing list thread
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373p2382.html>,
therefore the watermark advances for all the stream partitions output from
the map operator.
Some of the details regarding the partitioning were mentioned in the 0.9
docs
<https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html#partitioning>,
but unfortunately they aren't in the  1.x docs
<https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#physical-partitioning>
.

On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <ya...@gmail.com>
wrote:

> Hi all,
>
> I have a Kafka topic with two partitions, messages within each partition
> are ordered in ascending timestamps.
>
> The following code works correctly (I'm running this on my local machine,
> the default parallelism is the number of cores=8):
>
> stream = env.addSource(myFlinkKafkaConsumer09)
> stream.map(mapper)
>   .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
>   .keyby(0)
>   .timeWindow(Time.minutes(10))
>   .reduce(reducer)
>   .print()
>
> But if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
> where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp
> monotony violated" warnings. My understanding is that only 2 sources will
> be mapped to the topic partitions and since messages are ordered within
> each partition, timestamps assignment should happen correctly regardless of
> the parallelsim as long as it is >= 2.
> *Question 1 *: What is the explanation of this?
>
>
> Now I add an other empty partition to the topic. The job doesn't give any
> output anymore and that's expected since it keeps waiting forever for the
> empty partition's watermark. What I don't understand though, is a
> strange behavior when set the parallelism explicitly at the source :
> *Question 2 *: Why am I able to get an output if I explicitly set
> env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the
> empty partition argument apply here too? And why is that output seen only
> when n != 8 ?
>
> Best,
> Yassine
>

Re: Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

Posted by Yassine Marzougui <ya...@gmail.com>.
I think I figured out the explanation of the first part. Looks like the
stream gets distributed and merged between the source and the map operator
because their parallelisms are different, and therefore the messages
resulting from the map operator become out of order. The "Timestamp
monotony violated" warnings disappeared when I set the source and the map
operator to the same parallelism.
I found about operator chaining and I tried to chain the source and map
operators (as in here : https://ci.apache.org/projects/flink/flink-docs-
release-1.0/concepts/concepts.html#tasks--operator-chains) in order to have
the same parallelism, but I didn't succeed. Isn't doing env.addSource().
setparallelism(n).startNewChain().map(...)disableChaining() equivalent to
setting source and map parallelism to the same value?



On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <ya...@gmail.com>
wrote:

> Hi all,
>
> I have a Kafka topic with two partitions, messages within each partition
> are ordered in ascending timestamps.
>
> The following code works correctly (I'm running this on my local machine,
> the default parallelism is the number of cores=8):
>
> stream = env.addSource(myFlinkKafkaConsumer09)
> stream.map(mapper)
>   .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
>   .keyby(0)
>   .timeWindow(Time.minutes(10))
>   .reduce(reducer)
>   .print()
>
> But if I explicitly set env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
> where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp
> monotony violated" warnings. My understanding is that only 2 sources will
> be mapped to the topic partitions and since messages are ordered within
> each partition, timestamps assignment should happen correctly regardless of
> the parallelsim as long as it is >= 2.
> *Question 1 *: What is the explanation of this?
>
>
> Now I add an other empty partition to the topic. The job doesn't give any
> output anymore and that's expected since it keeps waiting forever for the
> empty partition's watermark. What I don't understand though, is a
> strange behavior when set the parallelism explicitly at the source :
> *Question 2 *: Why am I able to get an output if I explicitly set
> env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the
> empty partition argument apply here too? And why is that output seen only
> when n != 8 ?
>
> Best,
> Yassine
>