You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by D Stephan <ka...@gmail.com> on 2017/06/27 07:52:21 UTC

Kafka Stream invalid partitions

Hello,

When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have randomly
& frequently below exceptions:
In my opinion, it is not practical to clean up the invalid partitions
everydays.  For your information, this partition is an internal partition
that automatically created by KafkaStream Aggregate API.
Dou you have any idea or workarounds to mitigate this exception?




2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [
StreamThread-4] o.a.k.s.p.i.InternalTopicManager :
Could not create internal topics: Existing internal topic
external-batch-request-store-repartition has invalid partitions.
Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean up
invalid topics before processing. Retry #4

2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4"
org.apache.kafka.streams.errors.StreamsException: Could not create internal
topics.
2017-06-21T06:48:31.491087557Z at
org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:70)
2017-06-21T06:48:31.491091661Z at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:618)
2017-06-21T06:48:31.491096794Z at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:372)
2017-06-21T06:48:31.491368662Z at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:339)
2017-06-21T06:48:31.491390576Z at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:488)
2017-06-21T06:48:31.491397476Z at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
2017-06-21T06:48:31.491403757Z at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
2017-06-21T06:48:31.491408328Z at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
2017-06-21T06:48:31.491413053Z at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)

Re: Kafka Stream invalid partitions

Posted by D Stephan <ka...@gmail.com>.
Thanks Eno, for the info!.  I will try your suggestion.

2017-06-27 14:04 GMT+02:00 Eno Thereska <en...@gmail.com>:

> Thanks. I believe we’ve addressed this issue in 0.10.2.1, any chance you
> could try that?
>
> Thanks
> Eno
> > On Jun 27, 2017, at 11:14 AM, D Stephan <ka...@gmail.com> wrote:
> >
> > Hello,
> >
> > Thanks for your reply.
> >
> > I use Kafka & KafkaStream version 0.10.2.0.
> > Between the runs, the number of partitions are not intentionally changed
> > programmatically or manually.
> >
> > This topic:  "external-batch-request-store-repartition" is an internally
> > generated topic from this KafkaStream DSL
> > "aggregate"
> > https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/
> KGroupedStream.html#aggregate(org.apache.kafka.streams.
> kstream.Initializer,%20org.apache.kafka.streams.kstream.
> Aggregator,%20org.apache.kafka.streams.kstream.Windows,
> %20org.apache.kafka.common.serialization.Serde,%20java.lang.String)
> >
> >
> >
> > I use this API as follows:
> >
> > ...
> > .groupByKey()
> > .aggregate(...)
> > .toStream(...);
> >
> >
> > Please let me know if you need addiotional information.
> >
> > Thanks,
> >
> >
> > 2017-06-27 11:39 GMT+02:00 Eno Thereska <en...@gmail.com>:
> >
> >> Hi there,
> >>
> >> Thanks for the report. What version of Kafka are you using? Also,
> between
> >> runs do you change the number of partitions for your topics? I’m trying
> to
> >> figure out how this problem happens, any information on what is
> changing in
> >> between runs is appreciated.
> >>
> >> Thanks,
> >> Eno
> >>
> >>> On Jun 27, 2017, at 8:52 AM, D Stephan <ka...@gmail.com> wrote:
> >>>
> >>> Hello,
> >>>
> >>> When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have
> >> randomly
> >>> & frequently below exceptions:
> >>> In my opinion, it is not practical to clean up the invalid partitions
> >>> everydays.  For your information, this partition is an internal
> partition
> >>> that automatically created by KafkaStream Aggregate API.
> >>> Dou you have any idea or workarounds to mitigate this exception?
> >>>
> >>>
> >>>
> >>>
> >>> 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [
> >>> StreamThread-4] o.a.k.s.p.i.InternalTopicManager :
> >>> Could not create internal topics: Existing internal topic
> >>> external-batch-request-store-repartition has invalid partitions.
> >>> Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean
> >> up
> >>> invalid topics before processing. Retry #4
> >>>
> >>> 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4"
> >>> org.apache.kafka.streams.errors.StreamsException: Could not create
> >> internal
> >>> topics.
> >>> 2017-06-21T06:48:31.491087557Z at
> >>> org.apache.kafka.streams.processor.internals.InternalTopicManager.
> >> makeReady(InternalTopicManager.java:70)
> >>> 2017-06-21T06:48:31.491091661Z at
> >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> >> prepareTopic(StreamPartitionAssignor.java:618)
> >>> 2017-06-21T06:48:31.491096794Z at
> >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> >> assign(StreamPartitionAssignor.java:372)
> >>> 2017-06-21T06:48:31.491368662Z at
> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >> performAssignment(ConsumerCoordinator.java:339)
> >>> 2017-06-21T06:48:31.491390576Z at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> onJoinLeader(AbstractCoordinator.java:488)
> >>> 2017-06-21T06:48:31.491397476Z at
> >>> org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator.access$
> >> 1100(AbstractCoordinator.java:89)
> >>> 2017-06-21T06:48:31.491403757Z at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> >> JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
> >>> 2017-06-21T06:48:31.491408328Z at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> >> JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
> >>> 2017-06-21T06:48:31.491413053Z at
> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> >> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
> >>
> >>
>
>

Re: Kafka Stream invalid partitions

Posted by Eno Thereska <en...@gmail.com>.
Thanks. I believe we’ve addressed this issue in 0.10.2.1, any chance you could try that?

Thanks
Eno
> On Jun 27, 2017, at 11:14 AM, D Stephan <ka...@gmail.com> wrote:
> 
> Hello,
> 
> Thanks for your reply.
> 
> I use Kafka & KafkaStream version 0.10.2.0.
> Between the runs, the number of partitions are not intentionally changed
> programmatically or manually.
> 
> This topic:  "external-batch-request-store-repartition" is an internally
> generated topic from this KafkaStream DSL
> "aggregate"
> https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Windows,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String)
> 
> 
> 
> I use this API as follows:
> 
> ...
> .groupByKey()
> .aggregate(...)
> .toStream(...);
> 
> 
> Please let me know if you need addiotional information.
> 
> Thanks,
> 
> 
> 2017-06-27 11:39 GMT+02:00 Eno Thereska <en...@gmail.com>:
> 
>> Hi there,
>> 
>> Thanks for the report. What version of Kafka are you using? Also, between
>> runs do you change the number of partitions for your topics? I’m trying to
>> figure out how this problem happens, any information on what is changing in
>> between runs is appreciated.
>> 
>> Thanks,
>> Eno
>> 
>>> On Jun 27, 2017, at 8:52 AM, D Stephan <ka...@gmail.com> wrote:
>>> 
>>> Hello,
>>> 
>>> When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have
>> randomly
>>> & frequently below exceptions:
>>> In my opinion, it is not practical to clean up the invalid partitions
>>> everydays.  For your information, this partition is an internal partition
>>> that automatically created by KafkaStream Aggregate API.
>>> Dou you have any idea or workarounds to mitigate this exception?
>>> 
>>> 
>>> 
>>> 
>>> 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [
>>> StreamThread-4] o.a.k.s.p.i.InternalTopicManager :
>>> Could not create internal topics: Existing internal topic
>>> external-batch-request-store-repartition has invalid partitions.
>>> Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean
>> up
>>> invalid topics before processing. Retry #4
>>> 
>>> 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4"
>>> org.apache.kafka.streams.errors.StreamsException: Could not create
>> internal
>>> topics.
>>> 2017-06-21T06:48:31.491087557Z at
>>> org.apache.kafka.streams.processor.internals.InternalTopicManager.
>> makeReady(InternalTopicManager.java:70)
>>> 2017-06-21T06:48:31.491091661Z at
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
>> prepareTopic(StreamPartitionAssignor.java:618)
>>> 2017-06-21T06:48:31.491096794Z at
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
>> assign(StreamPartitionAssignor.java:372)
>>> 2017-06-21T06:48:31.491368662Z at
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>> performAssignment(ConsumerCoordinator.java:339)
>>> 2017-06-21T06:48:31.491390576Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> onJoinLeader(AbstractCoordinator.java:488)
>>> 2017-06-21T06:48:31.491397476Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$
>> 1100(AbstractCoordinator.java:89)
>>> 2017-06-21T06:48:31.491403757Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
>> JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
>>> 2017-06-21T06:48:31.491408328Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
>> JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
>>> 2017-06-21T06:48:31.491413053Z at
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
>> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
>> 
>> 


Re: Kafka Stream invalid partitions

Posted by D Stephan <ka...@gmail.com>.
Hello,

Thanks for your reply.

I use Kafka & KafkaStream version 0.10.2.0.
Between the runs, the number of partitions are not intentionally changed
programmatically or manually.

This topic:  "external-batch-request-store-repartition" is an internally
generated topic from this KafkaStream DSL
"aggregate"
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Windows,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String)



I use this API as follows:

...
.groupByKey()
.aggregate(...)
.toStream(...);


Please let me know if you need addiotional information.

Thanks,


2017-06-27 11:39 GMT+02:00 Eno Thereska <en...@gmail.com>:

> Hi there,
>
> Thanks for the report. What version of Kafka are you using? Also, between
> runs do you change the number of partitions for your topics? I’m trying to
> figure out how this problem happens, any information on what is changing in
> between runs is appreciated.
>
> Thanks,
> Eno
>
> > On Jun 27, 2017, at 8:52 AM, D Stephan <ka...@gmail.com> wrote:
> >
> > Hello,
> >
> > When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have
> randomly
> > & frequently below exceptions:
> > In my opinion, it is not practical to clean up the invalid partitions
> > everydays.  For your information, this partition is an internal partition
> > that automatically created by KafkaStream Aggregate API.
> > Dou you have any idea or workarounds to mitigate this exception?
> >
> >
> >
> >
> > 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [
> > StreamThread-4] o.a.k.s.p.i.InternalTopicManager :
> > Could not create internal topics: Existing internal topic
> > external-batch-request-store-repartition has invalid partitions.
> > Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean
> up
> > invalid topics before processing. Retry #4
> >
> > 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4"
> > org.apache.kafka.streams.errors.StreamsException: Could not create
> internal
> > topics.
> > 2017-06-21T06:48:31.491087557Z at
> > org.apache.kafka.streams.processor.internals.InternalTopicManager.
> makeReady(InternalTopicManager.java:70)
> > 2017-06-21T06:48:31.491091661Z at
> > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> prepareTopic(StreamPartitionAssignor.java:618)
> > 2017-06-21T06:48:31.491096794Z at
> > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> assign(StreamPartitionAssignor.java:372)
> > 2017-06-21T06:48:31.491368662Z at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> performAssignment(ConsumerCoordinator.java:339)
> > 2017-06-21T06:48:31.491390576Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> onJoinLeader(AbstractCoordinator.java:488)
> > 2017-06-21T06:48:31.491397476Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$
> 1100(AbstractCoordinator.java:89)
> > 2017-06-21T06:48:31.491403757Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
> > 2017-06-21T06:48:31.491408328Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
> > 2017-06-21T06:48:31.491413053Z at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
>
>

Re: Kafka Stream invalid partitions

Posted by Eno Thereska <en...@gmail.com>.
Hi there,

Thanks for the report. What version of Kafka are you using? Also, between runs do you change the number of partitions for your topics? I’m trying to figure out how this problem happens, any information on what is changing in between runs is appreciated.

Thanks,
Eno

> On Jun 27, 2017, at 8:52 AM, D Stephan <ka...@gmail.com> wrote:
> 
> Hello,
> 
> When I use KafkaStreams DSL GroupByKey and Aggregate APIs, I have randomly
> & frequently below exceptions:
> In my opinion, it is not practical to clean up the invalid partitions
> everydays.  For your information, this partition is an internal partition
> that automatically created by KafkaStream Aggregate API.
> Dou you have any idea or workarounds to mitigate this exception?
> 
> 
> 
> 
> 2017-06-21T06:48:31.488210812Z 2017-06-21 06:48:31.487 WARN 1 --- [
> StreamThread-4] o.a.k.s.p.i.InternalTopicManager :
> Could not create internal topics: Existing internal topic
> external-batch-request-store-repartition has invalid partitions.
> Expected: 20 Actual: 1. Use 'kafka.tools.StreamsResetter' tool to clean up
> invalid topics before processing. Retry #4
> 
> 2017-06-21T06:48:31.491071442Z Exception in thread "StreamThread-4"
> org.apache.kafka.streams.errors.StreamsException: Could not create internal
> topics.
> 2017-06-21T06:48:31.491087557Z at
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:70)
> 2017-06-21T06:48:31.491091661Z at
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:618)
> 2017-06-21T06:48:31.491096794Z at
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:372)
> 2017-06-21T06:48:31.491368662Z at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:339)
> 2017-06-21T06:48:31.491390576Z at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:488)
> 2017-06-21T06:48:31.491397476Z at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
> 2017-06-21T06:48:31.491403757Z at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
> 2017-06-21T06:48:31.491408328Z at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
> 2017-06-21T06:48:31.491413053Z at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)