You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jason Gustafson (JIRA)" <ji...@apache.org> on 2018/02/18 08:12:00 UTC

[jira] [Commented] (KAFKA-6553) Consumer consumed committed messages

    [ https://issues.apache.org/jira/browse/KAFKA-6553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368467#comment-16368467 ] 

Jason Gustafson commented on KAFKA-6553:
----------------------------------------

It can happens sometimes that there are fetches in-flight when a consumer group rebalances. When the rebalance completes, the consumer will be assigned a new set of partitions and will reset the positions of those partitions to the latest committed offsets. After it does so, the in-flight fetches may return and either no longer be needed if the partitions are no longer assigned, or no longer be at the right fetch offsets. In either case, we discard the data and send a new fetch at the current position. Typically this does not indicate a problem, but I'd have to see the full logs to be sure.

> Consumer consumed committed messages
> ------------------------------------
>
>                 Key: KAFKA-6553
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6553
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.2.0
>            Reporter: Orel Shai
>            Priority: Critical
>
> Hi,
> We're using consumer kafka client 0.10.2.0 (that is working against Kafka broker 0.10.0) with the following configuration:
> {code:java}
> props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
> props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
> props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
> props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
> props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
> {code}
> So as you can see we're using autocommit.
> The consumer API version that we're using has a dedicated thread for doing autocommit ,so every one second we have an autocommit which means that we have an heartbeat every one second.
> For some reason we're getting the same message lots of times.
> While looking at our logs I can see the following:
> {code:java}
> 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-15 to the committed offset 352878
> 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-3 to the committed offset 352458
> 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-19 to the committed offset 353775
> 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 to the committed offset 352171
> 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-7 to the committed offset 352995
> 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-27 to the committed offset 352531
> 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 to the committed offset 351893
> 2018-02-11 10:56:24,656 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 at offset 352171 since the current position is 352205
> 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 at offset 351893 since the current position is 351929
> 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-26 since it is no longer fetchable
> 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-17 since it is no longer fetchable
> 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-29 since it is no longer fetchable
> 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-5 since it is no longer fetchable
> 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-8 since it is no longer fetchable
> 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-20 since it is no longer fetchable
> 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-2 since it is no longer fetchable
> 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-14 since it is no longer fetchable
> {code}
> Consumer connection log:
> {code:java}
> 2018-02-12 08:18:13,506 DEBUG [DefaultThreadPool-9] Starting the Kafka consumer
> 2018-02-12 08:18:13,507 INFO [DefaultThreadPool-9] ConsumerConfig values: 
> 	auto.commit.interval.ms = 1000
> 	auto.offset.reset = latest
> 	bootstrap.servers = [list of servers]
> 	check.crcs = true
> 	client.id = 2cd03a2b-f040-4f7f-b20c-ce3fe5efbe00
> 	connections.max.idle.ms = 540000
> 	enable.auto.commit = true
> 	exclude.internal.topics = true
> 	fetch.max.bytes = 52428800
> 	fetch.max.wait.ms = 500
> 	fetch.min.bytes = 1
> 	group.id = UpdateNode.snbrepo.new
> 	heartbeat.interval.ms = 23333
> 	interceptor.classes = null
> 	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> 	max.partition.fetch.bytes = 16384
> 	max.poll.interval.ms = 300000
> 	max.poll.records = 100
> 	metadata.max.age.ms = 300000
> 	metric.reporters = []
> 	metrics.num.samples = 2
> 	metrics.recording.level = INFO
> 	metrics.sample.window.ms = 30000
> 	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RoundRobinAssignor]
> 	receive.buffer.bytes = 65536
> 	reconnect.backoff.ms = 50
> 	request.timeout.ms = 100000
> 	retry.backoff.ms = 100
> 	sasl.jaas.config = null
> 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
> 	sasl.kerberos.min.time.before.relogin = 60000
> 	sasl.kerberos.service.name = null
> 	sasl.kerberos.ticket.renew.jitter = 0.05
> 	sasl.kerberos.ticket.renew.window.factor = 0.8
> 	sasl.mechanism = GSSAPI
> 	security.protocol = PLAINTEXT
> 	send.buffer.bytes = 131072
> 	session.timeout.ms = 70000
> 	ssl.cipher.suites = null
> 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> 	ssl.endpoint.identification.algorithm = null
> 	ssl.key.password = null
> 	ssl.keymanager.algorithm = SunX509
> 	ssl.keystore.location = null
> 	ssl.keystore.password = null
> 	ssl.keystore.type = JKS
> 	ssl.protocol = TLS
> 	ssl.provider = null
> 	ssl.secure.random.implementation = null
> 	ssl.trustmanager.algorithm = PKIX
> 	ssl.truststore.location = null
> 	ssl.truststore.password = null
> 	ssl.truststore.type = JKS
> 	value.deserializer = <propriety deserializer>
> {code}
>  
> Do you know what might be the cause for it?
> Also , the processing time of the message may take more than the request timeout . If we're doing auto commit then it counts as heartbeat? Is there going to be any rebalance in such cases?
> Thanks!
> Orel



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)