You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2022/02/04 02:31:50 UTC

Re: Error triming topology

Hello Murilo,

Could you elaborate a bit more on how you "trimmed" the topology? For
example:

1) Did you change the code so that only 6 sub-topologies will be built, and
their sub-topology ids stays the same? i.e. you just trimmed the last 3
sub-topologies with id 6,7,8?
2) Did you delete the local state dir for those sub-topologies?
3) Did you delete all the repartition/changelog topics for those
sub-topologies?
4) For the remaining 6 sub-topologies, are their state store names and
topic names remains all the same? --- I understand that they have static
names, but do they have numerical suffices that get changed?

On Tue, Jan 25, 2022 at 6:43 PM Murilo Tavares <mu...@gmail.com> wrote:

> Hi
> I have a KafkaStreams application that is too heavyweight, with 9
> sub-topologies.
> I am trying to disable some unneeded part of the topology that is
> completely independent of the rest of the topology. Since my state stores
> have fixed, predictable names, I compared the topologies and I believe it
> should be safe to trim some sub-topologies.
> After trimming the unused ones, it now has 6 sub-topologies.
> Nevertheless, the application won't start. It seems to be trying to recover
> previous tasks, that shouldn't exist anymore.
> I have let the application down for 30 min so any timeouts, like session or
> polling timeouts could expire, but still, when the application starts, it
> reads the task states from somewhere and fails to recover it...
> Here's the log (note the "unknown task 7_0", which makes sense since the
> number of topologies felt from 9 to 6):
>
> 2022-01-26 02:28:17.552 [asdasdasd-StreamThread-1] INFO
>
>  org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor
> - Decided on assignment:
> {1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
> 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])
> prevActiveTasks: ([]) prevStandbyTasks: ([0_0, 0_1, 1_0, 1_1, 2_0, 2_1,
> 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1, 7_0, 7_1, 8_0, 8_1, 9_0, 9_1])
> changelogOffsetTotalsByTask: ([0_0=1244818, 0_1=625988, 1_0=15255,
> 1_1=64645, 2_0=670938, 2_1=100636, 3_0=6379662, 3_1=5600072, 4_0=2362,
> 4_1=15224, 5_0=19577, 5_1=113994, 6_0=7403980, 6_1=9195079, 7_0=226722,
> 7_1=76623, 8_0=7334, 8_1=66344, 9_0=0, 9_1=39]) taskLagTotals: ([0_0=3,
> 0_1=3, 1_0=1, 1_1=1, 2_0=1, 2_1=1, 3_0=7, 3_1=7, 4_0=1, 4_1=1, 5_0=1,
> 5_1=1, 6_0=1, 6_1=1]) capacity: 1 assigned: 14]} with no followup probing
> rebalance.
> 2022-01-26 02:28:17.558 [asdasdasd-StreamThread-1] INFO
>  org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor -
> stream-thread [asdasdasd-StreamThread-1-consumer] Assigned tasks [0_0, 0_1,
> 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1] including
> stateful [0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0,
> 6_1] to clients as:
> 1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
> 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])].
> 2022-01-26 02:28:17.566 [asdasdasd-StreamThread-1] INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> [Consumer instanceId=asdasdasd-1,
> clientId=asdasdasd-StreamThread-1-consumer, groupId=inventory-assembler-4]
> Rebalance failed.
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 7_0
> at
>
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor$$Lambda$534.00000000F9BC5F20.applyAsLong(Unknown
> Source) ~[?:?]
> at java.util.Comparator.lambda$comparingLong$6043328a$1(Unknown Source)
> ~[?:?]
> at java.util.Comparator$$Lambda$535.00000000F9B1D820.compare(Unknown
> Source) ~[?:?]
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Unknown Source)
> ~[?:?]
> at java.util.Comparator$$Lambda$399.00000000FEBE6620.compare(Unknown
> Source) ~[?:?]
> at java.util.TreeMap.put(Unknown Source) ~[?:?]
> at java.util.TreeSet.add(Unknown Source) ~[?:?]
> at java.util.AbstractCollection.addAll(Unknown Source) ~[?:?]
> at java.util.TreeSet.addAll(Unknown Source) ~[?:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1205)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1119)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:845)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:405)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
> [app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
> [app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
> [app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> [app.jar:?]
>
> Any idea how could I overcome this? I even tried to change  ConsumerConfig.
> *GROUP_INSTANCE_ID_CONFIG *and StreamsConfig.*CLIENT_ID_CONFIG *but that
> didn't seem to work...
>
> Thanks
> Murilo
>


-- 
-- Guozhang