You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2017/11/24 05:15:00 UTC
[jira] [Comment Edited] (KAFKA-6269) KTable state restore fails
after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264926#comment-16264926 ]
Matthias J. Sax edited comment on KAFKA-6269 at 11/24/17 5:14 AM:
------------------------------------------------------------------
Had a look into the code, and I think I understand the issue. The guard is not working as expected (what was my suspicion).
The only workaround I can think of atm, would be to read the table source topics as {{KStream}} and do a dummy aggregation that only keeps the latest value to transform the {{KStream}} into a {{KTable}} -- this will create a proper changelog topic and avoid hitting this issue.
For your special case, as you write the topic with {{to()}} you could also replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the {{entitiy-X-exists}} topics anymore. This should result in a no-overhead solution for you, as there will be no repartitioning topic as you only call a {{mapValues}} before the {{groupBy().aggregation()}}.
It would be great if you could confirm if this workaround works. Thx.
was (Author: mjsax):
Had a look into the code, and I think I understand the issue. The guard is not working as expected (what was my suspicion).
The only workaround I can think of atm, would be to read the table source topics as {{KStream}} and do a dummy aggregation that only keeps the latest value to transform the {{KStream}} into a {{KTable}} -- this will create a proper changelog topic and avoid hitting this issue.
For your special case, as you write the topic with {{to()}} you could also replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the {{entitiy-X-exists}} topics anymore. This should result in a no-overhead solution for you, as there will be no repartitioning topic as you only call a {{mapValues}} before the {{groupBy().aggregation()}}.
> KTable state restore fails after rebalance
> ------------------------------------------
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Andreas Schroeder
> Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
> sourceTopic: String,
> existsTopic: String,
> valueSerde: Serde[V],
> valueMapper: ValueMapper[String, V]): KTable[String, V] = {
> val stream: KStream[String, String] = builder.stream[String, String](sourceTopic)
> val transformed: KStream[String, V] = stream.mapValues(valueMapper)
> transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
> val inMemoryStoreName = s"$existsTopic-persisted"
> val materialized = Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
> .withKeySerde(Serdes.String())
> .withValueSerde(valueSerde)
> .withLoggingDisabled()
> builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
> buildTable(builder,
> "entity-B",
> "entity-B-exists",
> EntityBInfoSerde,
> ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
> buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => EntityDiff.fromJoin(a, b)
> val materialized = Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
> .withKeySerde(Serdes.String())
> .withValueSerde(EntityDiffSerde)
> .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 partitions so that there is a total of 4 x 30 = 120 partitions to consume. The initial launch of the processor works fine, but when killing one processor and letting him re-join the stream threads leads to some faulty behaviour.
> Fist, the total number of assigned partitions over all processor machines is larger than 120 (sometimes 157, sometimes just 132), so the partition / task assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly with the error message of 'Detected a task that got migrated to another thread.' We gave the processor half an hour to recover; usually, rebuilding the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Trying to rejoin the consumer group now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-B-exists-0 should not change while restoring: old end offset 4750539, current offset 4751388
> > StreamsTask taskId: 1_0
> > > ProcessorTopology:
> > KSTREAM-SOURCE-0000000008:
> > topics: [entity-A-exists]
> > children: [KTABLE-SOURCE-0000000009]
> > KTABLE-SOURCE-0000000009:
> > states: [entity-A-exists-persisted]
> > children: [KTABLE-JOINTHIS-0000000011]
> > KTABLE-JOINTHIS-0000000011:
> > states: [entity-B-exists-persisted]
> > children: [KTABLE-MERGE-0000000010]
> > KTABLE-MERGE-0000000010:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-0000000003:
> > topics: [entity-B-exists]
> > children: [KTABLE-SOURCE-0000000004]
> > KTABLE-SOURCE-0000000004:
> > states: [entity-B-exists-persisted]
> > children: [KTABLE-JOINOTHER-0000000012]
> > KTABLE-JOINOTHER-0000000012:
> > states: [entity-A-exists-persisted]
> > children: [KTABLE-MERGE-0000000010]
> > KTABLE-MERGE-0000000010:
> > states: [entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-0, entity-B-exists-0]
> at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> That one surprises me: the KTable state store entity-B-exists-persisted is rebuilt from entity-B-exists that of course can change while the rebuild is happening, since it the topic entity-B-exists is fed by another stream thread.
> Another one, very similar:
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-A-exists-24 should not change while restoring: old end offset 6483978, current offset 6485108
> > StreamsTask taskId: 1_24
> > > ProcessorTopology:
> > KSTREAM-SOURCE-0000000008:
> > topics: [entity-A-exists]
> > children: [KTABLE-SOURCE-0000000009]
> > KTABLE-SOURCE-0000000009:
> > states: [entity-A-exists-persisted]
> > children: [KTABLE-JOINTHIS-0000000011]
> > KTABLE-JOINTHIS-0000000011:
> > states: [entity-B-exists-persisted]
> > children: [KTABLE-MERGE-0000000010]
> > KTABLE-MERGE-0000000010:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-0000000003:
> > topics: [entity-B-exists]
> > children: [KTABLE-SOURCE-0000000004]
> > KTABLE-SOURCE-0000000004:
> > states: [entity-B-exists-persisted]
> > children: [KTABLE-JOINOTHER-0000000012]
> > KTABLE-JOINOTHER-0000000012:
> > states: [entity-A-exists-persisted]
> > children: [KTABLE-MERGE-0000000010]
> > KTABLE-MERGE-0000000010:
> > states: [entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-24, entity-B-exists-24]
> at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> Again, the topic entity-A-exists is fed by another stream thread.
> We saw around 60000 such errors per minute, as the stream threads continuously try to recover and fail.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)