You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Boquan Tang (JIRA)" <ji...@apache.org> on 2019/04/13 09:12:00 UTC

[jira] [Created] (KAFKA-8228) Exactly once semantics break during server restart for kafka-streams application

Boquan Tang created KAFKA-8228:
----------------------------------

             Summary: Exactly once semantics break during server restart for kafka-streams application
                 Key: KAFKA-8228
                 URL: https://issues.apache.org/jira/browse/KAFKA-8228
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.2.0
            Reporter: Boquan Tang


We are using 2.2.0 for kafka-streams client and 2.0.1 for server.

We have a simple kafka-streams application that has the following topology:
{code:java}
Source: KSTREAM-SOURCE-0000000004 (topics: [deduped-adclick]) 
--> KSTREAM-TRANSFORM-0000000005 
Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store]) 
--> KSTREAM-TRANSFORM-0000000006 
<-- KSTREAM-SOURCE-0000000004 
Source: KSTREAM-SOURCE-0000000000 (topics: [advertiser-budget]) 
--> KTABLE-SOURCE-0000000001 
Source: KSTREAM-SOURCE-0000000002 (topics: [advertisement-budget]) 
--> KTABLE-SOURCE-0000000003 
Processor: KSTREAM-TRANSFORM-0000000006 (stores: [advertiser-budget-store, advertisement-budget-store]) 
--> KSTREAM-SINK-0000000007 
<-- KSTREAM-TRANSFORM-0000000005 
Sink: KSTREAM-SINK-0000000007 (topic: budget-adclick) 
<-- KSTREAM-TRANSFORM-0000000006 
Processor: KTABLE-SOURCE-0000000001 (stores: [advertiser-budget-store]) 
--> none 
<-- KSTREAM-SOURCE-0000000000 
Processor: KTABLE-SOURCE-0000000003 (stores: [advertisement-budget-store]) 
--> none 
<-- KSTREAM-SOURCE-0000000002{code}
The *Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store])* is added additionally to investigate this EOS broken issue, and its transform() is like this (specific K V class name is removed):
{code:java}
public void init(final ProcessorContext context) {
uidStore = (WindowStore<String, Long>) context.getStateStore(uidStoreName);
this.context = context;
}

public KeyValue<K, V> transform(final K key, final V value) {
final long offset = context.offset();
final String uid = value.getUid();
final long beginningOfHour = Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond() * 1000;
final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour);
final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset;
uidStore.put(uid, offset, beginningOfHour);
if (dupe) {
LOGGER.warn("Find duplication in partition {}, uid is {}, current offset is {}, last offset is {}",
context.partition(),
uid,
value.getAdInfo().getAdId(),
offset,
maybeLastOffset);
statsEmitter.count("duplication");
}
return dupe ? null : new KeyValue<>(key, value);
}
{code}
Although not 100% reproduce-able, we found that after we restart one or more server on the cluster side, the duplication would be found:
{code:java}
2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] [kafka-producer-network-thread | adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] [Producer clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 (*:9092) could not be established. Broker may not be available.
2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] [kafka-producer-network-thread | adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] [Producer clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 (*:9092) could not be established. Broker may not be available.
2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] [kafka-producer-network-thread | adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer] [Producer clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer, transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 (*:9092) could not be established. Broker may not be available.
2019-04-12T07:27:39Z WARN [org.apache.kafka.streams.processor.internals.StreamThread] [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] stream-thread [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] Detected task 0_9 that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Will try to rejoin the consumer group. Below is the detailed description of the task: >TaskId: 0_9 >> ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [advertiser-budget] > children: [KTABLE-SOURCE-0000000001] > KTABLE-SOURCE-0000000001: > states: [advertiser-budget-store] > KSTREAM-SOURCE-0000000004: > topics: [deduped-adclick] > children: [KSTREAM-TRANSFORM-0000000005] > KSTREAM-TRANSFORM-0000000005: > states: [uid-offset-store] > children: [KSTREAM-TRANSFORM-0000000006] > KSTREAM-TRANSFORM-0000000006: > states: [advertiser-budget-store, advertisement-budget-store] > children: [KSTREAM-SINK-0000000007] > KSTREAM-SINK-0000000007: > topic: StaticTopicNameExtractor(budget-adclick) > KSTREAM-SOURCE-0000000002: > topics: [advertisement-budget] > children: [KTABLE-SOURCE-0000000003] > KTABLE-SOURCE-0000000003: > states: [advertisement-budget-store] >Partitions [advertiser-budget-9, deduped-adclick-9, advertisement-budget-9]
2019-04-12T07:27:40Z WARN [org.apache.kafka.common.utils.AppInfoParser] [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_18-producer at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:424) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287) at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createProducer(StreamThread.java:457) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.lambda$createTask$0(StreamThread.java:447) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:192) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:172) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:448) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:399) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:384) at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:148) at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:107) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:281) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:292) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:342) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
2019-04-12T07:30:28Z WARN [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer] [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] Find duplication in partition 18, uid is 1d8868ce40umu002, current offset is 212770034, last offset is 212770034
2019-04-12T07:30:28Z WARN [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer] [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] Find duplication in partition 18, uid is 1d8868du40u1u001, current offset is 212770036, last offset is 212770036{code}
And our kafka-streams are configured simply like this:
{code:java}
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaStreamsApplicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 1000);
{code}

In my understanding, the uid-offset-store in topology should be committed in the same transaction with that of the consumer offsets of budget-adclick topic, so in theory the duplication check should not hit, am I understanding it correctly?

I noticed one of the unusual issue on the server side is that the group coordinator needs long time to initiate when server restarts, that caused the long halt between node loss and eventual task migration.

Please let me know if I need to look into certain server side logs, I can share any finding with you guys, or even perform more of destroy test in order to re-produce the issue so we can investigate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)