You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Murilo Tavares <mu...@gmail.com> on 2022/01/26 02:42:41 UTC

Error triming topology

Hi
I have a KafkaStreams application that is too heavyweight, with 9
sub-topologies.
I am trying to disable some unneeded part of the topology that is
completely independent of the rest of the topology. Since my state stores
have fixed, predictable names, I compared the topologies and I believe it
should be safe to trim some sub-topologies.
After trimming the unused ones, it now has 6 sub-topologies.
Nevertheless, the application won't start. It seems to be trying to recover
previous tasks, that shouldn't exist anymore.
I have let the application down for 30 min so any timeouts, like session or
polling timeouts could expire, but still, when the application starts, it
reads the task states from somewhere and fails to recover it...
Here's the log (note the "unknown task 7_0", which makes sense since the
number of topologies felt from 9 to 6):

2022-01-26 02:28:17.552 [asdasdasd-StreamThread-1] INFO
 org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor
- Decided on assignment:
{1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])
prevActiveTasks: ([]) prevStandbyTasks: ([0_0, 0_1, 1_0, 1_1, 2_0, 2_1,
3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1, 7_0, 7_1, 8_0, 8_1, 9_0, 9_1])
changelogOffsetTotalsByTask: ([0_0=1244818, 0_1=625988, 1_0=15255,
1_1=64645, 2_0=670938, 2_1=100636, 3_0=6379662, 3_1=5600072, 4_0=2362,
4_1=15224, 5_0=19577, 5_1=113994, 6_0=7403980, 6_1=9195079, 7_0=226722,
7_1=76623, 8_0=7334, 8_1=66344, 9_0=0, 9_1=39]) taskLagTotals: ([0_0=3,
0_1=3, 1_0=1, 1_1=1, 2_0=1, 2_1=1, 3_0=7, 3_1=7, 4_0=1, 4_1=1, 5_0=1,
5_1=1, 6_0=1, 6_1=1]) capacity: 1 assigned: 14]} with no followup probing
rebalance.
2022-01-26 02:28:17.558 [asdasdasd-StreamThread-1] INFO
 org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor -
stream-thread [asdasdasd-StreamThread-1-consumer] Assigned tasks [0_0, 0_1,
1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1] including
stateful [0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0,
6_1] to clients as:
1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])].
2022-01-26 02:28:17.566 [asdasdasd-StreamThread-1] INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
[Consumer instanceId=asdasdasd-1,
clientId=asdasdasd-StreamThread-1-consumer, groupId=inventory-assembler-4]
Rebalance failed.
java.lang.IllegalStateException: Tried to lookup lag for unknown task 7_0
at
org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor$$Lambda$534.00000000F9BC5F20.applyAsLong(Unknown
Source) ~[?:?]
at java.util.Comparator.lambda$comparingLong$6043328a$1(Unknown Source)
~[?:?]
at java.util.Comparator$$Lambda$535.00000000F9B1D820.compare(Unknown
Source) ~[?:?]
at java.util.Comparator.lambda$thenComparing$36697e65$1(Unknown Source)
~[?:?]
at java.util.Comparator$$Lambda$399.00000000FEBE6620.compare(Unknown
Source) ~[?:?]
at java.util.TreeMap.put(Unknown Source) ~[?:?]
at java.util.TreeSet.add(Unknown Source) ~[?:?]
at java.util.AbstractCollection.addAll(Unknown Source) ~[?:?]
at java.util.TreeSet.addAll(Unknown Source) ~[?:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1205)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1119)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:845)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:405)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
[app.jar:?]

Any idea how could I overcome this? I even tried to change  ConsumerConfig.
*GROUP_INSTANCE_ID_CONFIG *and StreamsConfig.*CLIENT_ID_CONFIG *but that
didn't seem to work...

Thanks
Murilo

Re: Error triming topology

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Murilo,

Could you elaborate a bit more on how you "trimmed" the topology? For
example:

1) Did you change the code so that only 6 sub-topologies will be built, and
their sub-topology ids stays the same? i.e. you just trimmed the last 3
sub-topologies with id 6,7,8?
2) Did you delete the local state dir for those sub-topologies?
3) Did you delete all the repartition/changelog topics for those
sub-topologies?
4) For the remaining 6 sub-topologies, are their state store names and
topic names remains all the same? --- I understand that they have static
names, but do they have numerical suffices that get changed?

On Tue, Jan 25, 2022 at 6:43 PM Murilo Tavares <mu...@gmail.com> wrote:

> Hi
> I have a KafkaStreams application that is too heavyweight, with 9
> sub-topologies.
> I am trying to disable some unneeded part of the topology that is
> completely independent of the rest of the topology. Since my state stores
> have fixed, predictable names, I compared the topologies and I believe it
> should be safe to trim some sub-topologies.
> After trimming the unused ones, it now has 6 sub-topologies.
> Nevertheless, the application won't start. It seems to be trying to recover
> previous tasks, that shouldn't exist anymore.
> I have let the application down for 30 min so any timeouts, like session or
> polling timeouts could expire, but still, when the application starts, it
> reads the task states from somewhere and fails to recover it...
> Here's the log (note the "unknown task 7_0", which makes sense since the
> number of topologies felt from 9 to 6):
>
> 2022-01-26 02:28:17.552 [asdasdasd-StreamThread-1] INFO
>
>  org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor
> - Decided on assignment:
> {1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
> 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])
> prevActiveTasks: ([]) prevStandbyTasks: ([0_0, 0_1, 1_0, 1_1, 2_0, 2_1,
> 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1, 7_0, 7_1, 8_0, 8_1, 9_0, 9_1])
> changelogOffsetTotalsByTask: ([0_0=1244818, 0_1=625988, 1_0=15255,
> 1_1=64645, 2_0=670938, 2_1=100636, 3_0=6379662, 3_1=5600072, 4_0=2362,
> 4_1=15224, 5_0=19577, 5_1=113994, 6_0=7403980, 6_1=9195079, 7_0=226722,
> 7_1=76623, 8_0=7334, 8_1=66344, 9_0=0, 9_1=39]) taskLagTotals: ([0_0=3,
> 0_1=3, 1_0=1, 1_1=1, 2_0=1, 2_1=1, 3_0=7, 3_1=7, 4_0=1, 4_1=1, 5_0=1,
> 5_1=1, 6_0=1, 6_1=1]) capacity: 1 assigned: 14]} with no followup probing
> rebalance.
> 2022-01-26 02:28:17.558 [asdasdasd-StreamThread-1] INFO
>  org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor -
> stream-thread [asdasdasd-StreamThread-1-consumer] Assigned tasks [0_0, 0_1,
> 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1] including
> stateful [0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0,
> 6_1] to clients as:
> 1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
> 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])].
> 2022-01-26 02:28:17.566 [asdasdasd-StreamThread-1] INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> [Consumer instanceId=asdasdasd-1,
> clientId=asdasdasd-StreamThread-1-consumer, groupId=inventory-assembler-4]
> Rebalance failed.
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 7_0
> at
>
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor$$Lambda$534.00000000F9BC5F20.applyAsLong(Unknown
> Source) ~[?:?]
> at java.util.Comparator.lambda$comparingLong$6043328a$1(Unknown Source)
> ~[?:?]
> at java.util.Comparator$$Lambda$535.00000000F9B1D820.compare(Unknown
> Source) ~[?:?]
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Unknown Source)
> ~[?:?]
> at java.util.Comparator$$Lambda$399.00000000FEBE6620.compare(Unknown
> Source) ~[?:?]
> at java.util.TreeMap.put(Unknown Source) ~[?:?]
> at java.util.TreeSet.add(Unknown Source) ~[?:?]
> at java.util.AbstractCollection.addAll(Unknown Source) ~[?:?]
> at java.util.TreeSet.addAll(Unknown Source) ~[?:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1205)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1119)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:845)
> ~[app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:405)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
> [app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> ~[app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
> [app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
> [app.jar:?]
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
> [app.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> [app.jar:?]
>
> Any idea how could I overcome this? I even tried to change  ConsumerConfig.
> *GROUP_INSTANCE_ID_CONFIG *and StreamsConfig.*CLIENT_ID_CONFIG *but that
> didn't seem to work...
>
> Thanks
> Murilo
>


-- 
-- Guozhang