You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Håkon Åmdal <ha...@gmail.com> on 2017/09/05 14:00:14 UTC

Adding or removing input topics to a Kafka Consumer without downtime

Hi there,

In our company, we are running multiple Kafka Streams app that are deployed
in a red/black fashion. A deployment means starting a new set of hosts, run
them in parallel with the old hosts until they pass the health check, and
then scale down the old hosts. This approach works very well as we can
deploy application changes without any downtime.

However, as far as I understand, there is no way we can run a new and old
version in parallel if they don’t consume from the same set of topics.
Consumers will try to rebalance tasks between the two different application
version, causing errors like the one below:

Uncaught exception: Thread xyz-StreamThread-1 stopped unexpectedly after
Assigned partition foo-1 for non-subscribed topic regex pattern;
subscription pattern is bar

To mitigate the problem, we have so far scaled down the old cluster to 0
instances before deploying the new application if we’re adding or removing
input topics. However, this causes service downtime which we no longer can
accept.

I’m curious to hear if anyone are experiencing the same issues, or if
anyone have any thoughts or opinions? Are we doing something wrong, or is
this something that can be solved by the Kafka Consumer client?

Thanks,

Håkon

Re: Adding or removing input topics to a Kafka Consumer without downtime

Posted by Håkon Åmdal <ha...@gmail.com>.
Could this be something that only is relevant to the Streams app, then?

I've also tried to add a join, but I'm getting the same issues with
repartitioning the state store. I've pasted some stack traces below.

19:14:54 ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided
listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
for group Jord failed on partition assignment
org.apache.kafka.streams.errors.StreamsException: Store
KSTREAM-JOINTHIS-0000000017-store's change log
(Jord-KSTREAM-JOINTHIS-0000000017-store-changelog) does not contain
partition 16
	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
	at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:110)
	at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:72)
	at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:65)
	at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:95)
	at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)


19:14:54 ERROR c.s.s.data.yggdrasil.Yggdrasil - Uncaught exception:
Thread Jord-f5f1364e-feb5-445e-a61c-c36b15b76605-StreamThread-5
stopped unexpectedly after stream-thread
[Jord-f5f1364e-feb5-445e-a61c-c36b15b76605-StreamThread-5] Failed to
rebalance.
org.apache.kafka.streams.errors.StreamsException: stream-thread
[Jord-f5f1364e-feb5-445e-a61c-c36b15b76605-StreamThread-5] Failed to
rebalance.
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:589)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: Store
KSTREAM-JOINTHIS-0000000017-store's change log
(Jord-KSTREAM-JOINTHIS-0000000017-store-changelog) does not contain
partition 17
	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
	at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:110)
	at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:72)
	at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:65)
	at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:95)
	at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
	... 2 common frames omitted


tir. 5. sep. 2017 kl. 23:02 skrev Jeff Widman <je...@jeffwidman.com>:

> I haven't played with the Streams app, but I know at the consumer group
> level that consumers who are part of the same group can have different
> subscriptions. So at the consumer group level at least this is
> theoretically possible to have a rolling upgrade (and in fact was
> purposefully designed to support this use case)
>
> On Tue, Sep 5, 2017 at 7:00 AM, Håkon Åmdal <ha...@gmail.com>
> wrote:
>
> > Hi there,
> >
> > In our company, we are running multiple Kafka Streams app that are
> deployed
> > in a red/black fashion. A deployment means starting a new set of hosts,
> run
> > them in parallel with the old hosts until they pass the health check, and
> > then scale down the old hosts. This approach works very well as we can
> > deploy application changes without any downtime.
> >
> > However, as far as I understand, there is no way we can run a new and old
> > version in parallel if they don’t consume from the same set of topics.
> > Consumers will try to rebalance tasks between the two different
> application
> > version, causing errors like the one below:
> >
> > Uncaught exception: Thread xyz-StreamThread-1 stopped unexpectedly after
> > Assigned partition foo-1 for non-subscribed topic regex pattern;
> > subscription pattern is bar
> >
> > To mitigate the problem, we have so far scaled down the old cluster to 0
> > instances before deploying the new application if we’re adding or
> removing
> > input topics. However, this causes service downtime which we no longer
> can
> > accept.
> >
> > I’m curious to hear if anyone are experiencing the same issues, or if
> > anyone have any thoughts or opinions? Are we doing something wrong, or is
> > this something that can be solved by the Kafka Consumer client?
> >
> > Thanks,
> >
> > Håkon
> >
>
>
>
> --
>
> *Jeff Widman*
> jeffwidman.com <http://www.jeffwidman.com/> | 740-WIDMAN-J (943-6265)
> <><
>

Re: Adding or removing input topics to a Kafka Consumer without downtime

Posted by Jeff Widman <je...@jeffwidman.com>.
I haven't played with the Streams app, but I know at the consumer group
level that consumers who are part of the same group can have different
subscriptions. So at the consumer group level at least this is
theoretically possible to have a rolling upgrade (and in fact was
purposefully designed to support this use case)

On Tue, Sep 5, 2017 at 7:00 AM, Håkon Åmdal <ha...@gmail.com> wrote:

> Hi there,
>
> In our company, we are running multiple Kafka Streams app that are deployed
> in a red/black fashion. A deployment means starting a new set of hosts, run
> them in parallel with the old hosts until they pass the health check, and
> then scale down the old hosts. This approach works very well as we can
> deploy application changes without any downtime.
>
> However, as far as I understand, there is no way we can run a new and old
> version in parallel if they don’t consume from the same set of topics.
> Consumers will try to rebalance tasks between the two different application
> version, causing errors like the one below:
>
> Uncaught exception: Thread xyz-StreamThread-1 stopped unexpectedly after
> Assigned partition foo-1 for non-subscribed topic regex pattern;
> subscription pattern is bar
>
> To mitigate the problem, we have so far scaled down the old cluster to 0
> instances before deploying the new application if we’re adding or removing
> input topics. However, this causes service downtime which we no longer can
> accept.
>
> I’m curious to hear if anyone are experiencing the same issues, or if
> anyone have any thoughts or opinions? Are we doing something wrong, or is
> this something that can be solved by the Kafka Consumer client?
>
> Thanks,
>
> Håkon
>



-- 

*Jeff Widman*
jeffwidman.com <http://www.jeffwidman.com/> | 740-WIDMAN-J (943-6265)
<><