You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ninad <nn...@gmail.com> on 2017/07/12 20:18:34 UTC

FlinkKafkaConsumer subscribes to partitions in restoredState only.

Hello,
We're noticing that FlinkKafkaConsumer subscribes to partitions in restored
state only. Thus, partitions which aren't in restored state aren't read. We
have to restart the job, for FlinkKafkaConsumer to read from all partitions. 

Here are the details:

Environment:
Flink-1.3.0, standalone cluster as well as hadoop-cloudera cluster
flink-connector-kafka-0.9_2.11:1.3.0

-Start a job which reads from kafka topic.
-Bring down all kafka brokers
-Bring up kafka brokers

At this point, we see this in the logs:

*2017-07-12 19:53:23,661 INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
Consumer subtask 0 will start reading 8 partitions with offsets in restored
state: {KafkaTopicPartition{topic='topic.event.filter', partition=0}=3,
KafkaTopicPartition{topic='topic.event.filter', partition=2}=18,
KafkaTopicPartition{topic='topic.event.filter', partition=8}=1,
KafkaTopicPartition{topic='topic.event.filter', partition=9}=-1,
KafkaTopicPartition{topic='topic.event.filter', partition=3}=17,
KafkaTopicPartition{topic='topic.event.filter', partition=4}=17,
KafkaTopicPartition{topic='topic.event.filter', partition=5}=17,
KafkaTopicPartition{topic='topic.event.filter', partition=6}=1}*

Flink subscribes to only 8 partitions, because they are in recovery.
Remaining partitions aren't subscribed to.

From the code, I don't see a place where, the partitions in non-restored
state are being subscribed to.

Relevant code:

*if (restoredState != null) {
			for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
				if (restoredState.containsKey(kafkaTopicPartition)) {
					subscribedPartitionsToStartOffsets.put(kafkaTopicPartition,
restoredState.get(kafkaTopicPartition));
				}
			}

			LOG.info("Consumer subtask {} will start reading {} partitions with
offsets in restored state: {}",
				getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets);
		}*

We're not setting 'setStartFromEarliest' or 'setStartFromLatest', so it's
using the default: 'setStartFromGroupOffsets'.

Are we missing any setting? Doing something wrong? Please let us know.
Thanks !






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-subscribes-to-partitions-in-restoredState-only-tp14233.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: FlinkKafkaConsumer subscribes to partitions in restoredState only.

Posted by ninad <nn...@gmail.com>.
Got it. Thanks Gordon. 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-subscribes-to-partitions-in-restoredState-only-tp14233p14484.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: FlinkKafkaConsumer subscribes to partitions in restoredState only.

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
One other note:
the functionality is actually already merged to the master branch.
You can also take a look at the feature documentation here [1].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-partition-discovery

On 25 July 2017 at 1:22:10 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org) wrote:

Hi,

Sorry for not replying to this earlier, it seems like this thread hadn’t been noticed earlier.

What you are experiencing is expected behavior. In Flink 1.3, new partitions will not be picked up, only partitions that are in checkpoints state will be subscribed to on restore runs.
One main reason for this is because partition discovery on restore runs does not work well with repartitionable list operator state.

This limitation will be removed in Flink 1.4, where the Flink Kafka Consumer will have the capability to discover partitions (even during processing without the need to restart the job).
Unfortunately, this is not available in Flink 1.3.

Cheers,
Gordon

On 13 July 2017 at 4:37:38 AM, ninad (nninad@gmail.com) wrote:

Hello,
We're noticing that FlinkKafkaConsumer subscribes to partitions in restored
state only. Thus, partitions which aren't in restored state aren't read. We
have to restart the job, for FlinkKafkaConsumer to read from all partitions.

Here are the details:

Environment:
Flink-1.3.0, standalone cluster as well as hadoop-cloudera cluster
flink-connector-kafka-0.9_2.11:1.3.0

-Start a job which reads from kafka topic.
-Bring down all kafka brokers
-Bring up kafka brokers

At this point, we see this in the logs:

*2017-07-12 19:53:23,661 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -
Consumer subtask 0 will start reading 8 partitions with offsets in restored
state: {KafkaTopicPartition{topic='topic.event.filter', partition=0}=3,
KafkaTopicPartition{topic='topic.event.filter', partition=2}=18,
KafkaTopicPartition{topic='topic.event.filter', partition=8}=1,
KafkaTopicPartition{topic='topic.event.filter', partition=9}=-1,
KafkaTopicPartition{topic='topic.event.filter', partition=3}=17,
KafkaTopicPartition{topic='topic.event.filter', partition=4}=17,
KafkaTopicPartition{topic='topic.event.filter', partition=5}=17,
KafkaTopicPartition{topic='topic.event.filter', partition=6}=1}*

Flink subscribes to only 8 partitions, because they are in recovery.
Remaining partitions aren't subscribed to.

From the code, I don't see a place where, the partitions in non-restored
state are being subscribed to.

Relevant code:

*if (restoredState != null) {
for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
if (restoredState.containsKey(kafkaTopicPartition)) {
subscribedPartitionsToStartOffsets.put(kafkaTopicPartition,
restoredState.get(kafkaTopicPartition));
}
}

LOG.info("Consumer subtask {} will start reading {} partitions with
offsets in restored state: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets);
}*

We're not setting 'setStartFromEarliest' or 'setStartFromLatest', so it's
using the default: 'setStartFromGroupOffsets'.

Are we missing any setting? Doing something wrong? Please let us know.
Thanks !






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-subscribes-to-partitions-in-restoredState-only-tp14233.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: FlinkKafkaConsumer subscribes to partitions in restoredState only.

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Sorry for not replying to this earlier, it seems like this thread hadn’t been noticed earlier.

What you are experiencing is expected behavior. In Flink 1.3, new partitions will not be picked up, only partitions that are in checkpoints state will be subscribed to on restore runs.
One main reason for this is because partition discovery on restore runs does not work well with repartitionable list operator state.

This limitation will be removed in Flink 1.4, where the Flink Kafka Consumer will have the capability to discover partitions (even during processing without the need to restart the job).
Unfortunately, this is not available in Flink 1.3.

Cheers,
Gordon

On 13 July 2017 at 4:37:38 AM, ninad (nninad@gmail.com) wrote:

Hello,  
We're noticing that FlinkKafkaConsumer subscribes to partitions in restored  
state only. Thus, partitions which aren't in restored state aren't read. We  
have to restart the job, for FlinkKafkaConsumer to read from all partitions.  

Here are the details:  

Environment:  
Flink-1.3.0, standalone cluster as well as hadoop-cloudera cluster  
flink-connector-kafka-0.9_2.11:1.3.0  

-Start a job which reads from kafka topic.  
-Bring down all kafka brokers  
-Bring up kafka brokers  

At this point, we see this in the logs:  

*2017-07-12 19:53:23,661 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -  
Consumer subtask 0 will start reading 8 partitions with offsets in restored  
state: {KafkaTopicPartition{topic='topic.event.filter', partition=0}=3,  
KafkaTopicPartition{topic='topic.event.filter', partition=2}=18,  
KafkaTopicPartition{topic='topic.event.filter', partition=8}=1,  
KafkaTopicPartition{topic='topic.event.filter', partition=9}=-1,  
KafkaTopicPartition{topic='topic.event.filter', partition=3}=17,  
KafkaTopicPartition{topic='topic.event.filter', partition=4}=17,  
KafkaTopicPartition{topic='topic.event.filter', partition=5}=17,  
KafkaTopicPartition{topic='topic.event.filter', partition=6}=1}*  

Flink subscribes to only 8 partitions, because they are in recovery.  
Remaining partitions aren't subscribed to.  

From the code, I don't see a place where, the partitions in non-restored  
state are being subscribed to.  

Relevant code:  

*if (restoredState != null) {  
for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {  
if (restoredState.containsKey(kafkaTopicPartition)) {  
subscribedPartitionsToStartOffsets.put(kafkaTopicPartition,  
restoredState.get(kafkaTopicPartition));  
}  
}  

LOG.info("Consumer subtask {} will start reading {} partitions with  
offsets in restored state: {}",  
getRuntimeContext().getIndexOfThisSubtask(),  
subscribedPartitionsToStartOffsets.size(),  
subscribedPartitionsToStartOffsets);  
}*  

We're not setting 'setStartFromEarliest' or 'setStartFromLatest', so it's  
using the default: 'setStartFromGroupOffsets'.  

Are we missing any setting? Doing something wrong? Please let us know.  
Thanks !  






--  
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-subscribes-to-partitions-in-restoredState-only-tp14233.html  
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.  

Re: FlinkKafkaConsumer subscribes to partitions in restoredState only.

Posted by ninad <nn...@gmail.com>.
Any update on this guys?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-subscribes-to-partitions-in-restoredState-only-tp14233p14410.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.