You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitry Minkovsky <dm...@gmail.com> on 2017/04/11 20:09:50 UTC

Kafka Streams Application does not start after 10.1 to 10.2 update if topics need to be auto-created

I updated from 10.1 and 10.2. I updated both the broker and maven
dependency.

I am using topic auto-create. With 10.1, starting the application with a
broker would sometimes result in an error like:

> Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: stream-thread [StreamThread-1] Topic not found: $topic

But this would only happen once. Upon the second attempt, the topics are
already created and everything works fine.

But with 10.2 this error does not go away. I have confirmed and tested that
auto topic creation is enabled.

Here is the error/trace:


Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: stream-thread [StreamThread-1] Topic not found: session-updates
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:734)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:648)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:368)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:339)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:488)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:745)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:334)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)


It does not occur if my topology only defines streams and tables. However,
when I attempt to join a stream and a table, this error is thrown:

        // No error if this is in topology
        KTable<K, V> sessions = topology.table(byteStringSerde,
sessionSerde, "sessions", "sessions");

        // No error if this is in topology
        KStream<ByteString, Messages.EntityUpdate> sessionUpdates =
topology.stream(byteStringSerde, sessionUpdateSerde, "session-updates");

        // Error if this is in topology
        sessionUpdates
          .leftJoin(sessions, (update, value) -> {
              // do update, omitted
          })
          .filter((k, v) -> v != null)
          .to(byteStringSerde, sessionSerde, "sessions");

Re: Kafka Streams Application does not start after 10.1 to 10.2 update if topics need to be auto-created

Posted by Dmitry Minkovsky <dm...@gmail.com>.
Thank you Matthias,

I am using topic auto-creation just for convenience in development. My plan
was to figure out which topics I needed and then create them in an
initialization script in production. Auto-creation does make it easier to
iterate. At least for me.

Good to know this doesn't apply for internal topics! Since I am near
production, I will finally write that topics init script and go back up to
10.2 :) Really looking forward to GlobalKTables.

Thank you,
Dmitry

On Thu, Apr 13, 2017 at 1:36 PM, Eno Thereska <en...@gmail.com>
wrote:

> No, internal topics do not need to be manually created.
>
> Eno
> > On 13 Apr 2017, at 10:00, Shimi Kiviti <sh...@gmail.com> wrote:
> >
> > Is that (manual topic creation) also true for internal topics?
> >
> > On Thu, 13 Apr 2017 at 19:14 Matthias J. Sax <ma...@confluent.io>
> wrote:
> >
> >> Hi,
> >>
> >> thanks for reporting this issue. We are aware of a bug in 0.10.2 that
> >> seems to be related: https://issues.apache.org/jira/browse/KAFKA-5037
> >>
> >> However, I also want to point out, that it is highly recommended to not
> >> use auto topic create for Streams, but to manually create all
> >> input/output topics before you start your Streams application.
> >>
> >> For more details, see
> >>
> >> http://docs.confluent.io/current/streams/developer-
> guide.html#managing-topics-of-a-kafka-streams-application
> >>
> >>
> >> May I ask, why your are using topic auto create?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 4/11/17 1:09 PM, Dmitry Minkovsky wrote:
> >>> I updated from 10.1 and 10.2. I updated both the broker and maven
> >>> dependency.
> >>>
> >>> I am using topic auto-create. With 10.1, starting the application with
> a
> >>> broker would sometimes result in an error like:
> >>>
> >>>> Exception in thread "StreamThread-1"
> >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> >> topology
> >>> building: stream-thread [StreamThread-1] Topic not found: $topic
> >>>
> >>> But this would only happen once. Upon the second attempt, the topics
> are
> >>> already created and everything works fine.
> >>>
> >>> But with 10.2 this error does not go away. I have confirmed and tested
> >> that
> >>> auto topic creation is enabled.
> >>>
> >>> Here is the error/trace:
> >>>
> >>>
> >>> Exception in thread "StreamThread-1"
> >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> >> topology
> >>> building: stream-thread [StreamThread-1] Topic not found:
> session-updates
> >>> at
> >>>
> >> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$
> CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:734)
> >>> at
> >>>
> >> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> ensureCopartitioning(StreamPartitionAssignor.java:648)
> >>> at
> >>>
> >> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.
> assign(StreamPartitionAssignor.java:368)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> performAssignment(ConsumerCoordinator.java:339)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> onJoinLeader(AbstractCoordinator.java:488)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$
> 1100(AbstractCoordinator.java:89)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:745)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(
> RequestFuture.java:186)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(
> RequestFuture.java:149)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(
> RequestFuture.java:116)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$
> RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:
> 493)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> firePendingCompletedRequests(ConsumerNetworkClient.java:322)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:253)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:172)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:334)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:286)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1030)
> >>> at
> >>>
> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> >>> at
> >>>
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:582)
> >>> at
> >>>
> >> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> >>>
> >>>
> >>> It does not occur if my topology only defines streams and tables.
> >> However,
> >>> when I attempt to join a stream and a table, this error is thrown:
> >>>
> >>>        // No error if this is in topology
> >>>        KTable<K, V> sessions = topology.table(byteStringSerde,
> >>> sessionSerde, "sessions", "sessions");
> >>>
> >>>        // No error if this is in topology
> >>>        KStream<ByteString, Messages.EntityUpdate> sessionUpdates =
> >>> topology.stream(byteStringSerde, sessionUpdateSerde,
> "session-updates");
> >>>
> >>>        // Error if this is in topology
> >>>        sessionUpdates
> >>>          .leftJoin(sessions, (update, value) -> {
> >>>              // do update, omitted
> >>>          })
> >>>          .filter((k, v) -> v != null)
> >>>          .to(byteStringSerde, sessionSerde, "sessions");
> >>>
> >>
> >>
>
>

Re: Kafka Streams Application does not start after 10.1 to 10.2 update if topics need to be auto-created

Posted by Eno Thereska <en...@gmail.com>.
No, internal topics do not need to be manually created.

Eno
> On 13 Apr 2017, at 10:00, Shimi Kiviti <sh...@gmail.com> wrote:
> 
> Is that (manual topic creation) also true for internal topics?
> 
> On Thu, 13 Apr 2017 at 19:14 Matthias J. Sax <ma...@confluent.io> wrote:
> 
>> Hi,
>> 
>> thanks for reporting this issue. We are aware of a bug in 0.10.2 that
>> seems to be related: https://issues.apache.org/jira/browse/KAFKA-5037
>> 
>> However, I also want to point out, that it is highly recommended to not
>> use auto topic create for Streams, but to manually create all
>> input/output topics before you start your Streams application.
>> 
>> For more details, see
>> 
>> http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application
>> 
>> 
>> May I ask, why your are using topic auto create?
>> 
>> 
>> -Matthias
>> 
>> 
>> On 4/11/17 1:09 PM, Dmitry Minkovsky wrote:
>>> I updated from 10.1 and 10.2. I updated both the broker and maven
>>> dependency.
>>> 
>>> I am using topic auto-create. With 10.1, starting the application with a
>>> broker would sometimes result in an error like:
>>> 
>>>> Exception in thread "StreamThread-1"
>>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
>> topology
>>> building: stream-thread [StreamThread-1] Topic not found: $topic
>>> 
>>> But this would only happen once. Upon the second attempt, the topics are
>>> already created and everything works fine.
>>> 
>>> But with 10.2 this error does not go away. I have confirmed and tested
>> that
>>> auto topic creation is enabled.
>>> 
>>> Here is the error/trace:
>>> 
>>> 
>>> Exception in thread "StreamThread-1"
>>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
>> topology
>>> building: stream-thread [StreamThread-1] Topic not found: session-updates
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:734)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:648)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:368)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:339)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:488)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:745)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:334)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>>> 
>>> 
>>> It does not occur if my topology only defines streams and tables.
>> However,
>>> when I attempt to join a stream and a table, this error is thrown:
>>> 
>>>        // No error if this is in topology
>>>        KTable<K, V> sessions = topology.table(byteStringSerde,
>>> sessionSerde, "sessions", "sessions");
>>> 
>>>        // No error if this is in topology
>>>        KStream<ByteString, Messages.EntityUpdate> sessionUpdates =
>>> topology.stream(byteStringSerde, sessionUpdateSerde, "session-updates");
>>> 
>>>        // Error if this is in topology
>>>        sessionUpdates
>>>          .leftJoin(sessions, (update, value) -> {
>>>              // do update, omitted
>>>          })
>>>          .filter((k, v) -> v != null)
>>>          .to(byteStringSerde, sessionSerde, "sessions");
>>> 
>> 
>> 


Re: Kafka Streams Application does not start after 10.1 to 10.2 update if topics need to be auto-created

Posted by Shimi Kiviti <sh...@gmail.com>.
Is that (manual topic creation) also true for internal topics?

On Thu, 13 Apr 2017 at 19:14 Matthias J. Sax <ma...@confluent.io> wrote:

> Hi,
>
> thanks for reporting this issue. We are aware of a bug in 0.10.2 that
> seems to be related: https://issues.apache.org/jira/browse/KAFKA-5037
>
> However, I also want to point out, that it is highly recommended to not
> use auto topic create for Streams, but to manually create all
> input/output topics before you start your Streams application.
>
> For more details, see
>
> http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application
>
>
> May I ask, why your are using topic auto create?
>
>
> -Matthias
>
>
> On 4/11/17 1:09 PM, Dmitry Minkovsky wrote:
> > I updated from 10.1 and 10.2. I updated both the broker and maven
> > dependency.
> >
> > I am using topic auto-create. With 10.1, starting the application with a
> > broker would sometimes result in an error like:
> >
> >> Exception in thread "StreamThread-1"
> > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> topology
> > building: stream-thread [StreamThread-1] Topic not found: $topic
> >
> > But this would only happen once. Upon the second attempt, the topics are
> > already created and everything works fine.
> >
> > But with 10.2 this error does not go away. I have confirmed and tested
> that
> > auto topic creation is enabled.
> >
> > Here is the error/trace:
> >
> >
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> topology
> > building: stream-thread [StreamThread-1] Topic not found: session-updates
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:734)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:648)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:368)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:339)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:488)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:745)
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:334)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> >
> >
> > It does not occur if my topology only defines streams and tables.
> However,
> > when I attempt to join a stream and a table, this error is thrown:
> >
> >         // No error if this is in topology
> >         KTable<K, V> sessions = topology.table(byteStringSerde,
> > sessionSerde, "sessions", "sessions");
> >
> >         // No error if this is in topology
> >         KStream<ByteString, Messages.EntityUpdate> sessionUpdates =
> > topology.stream(byteStringSerde, sessionUpdateSerde, "session-updates");
> >
> >         // Error if this is in topology
> >         sessionUpdates
> >           .leftJoin(sessions, (update, value) -> {
> >               // do update, omitted
> >           })
> >           .filter((k, v) -> v != null)
> >           .to(byteStringSerde, sessionSerde, "sessions");
> >
>
>

Re: Kafka Streams Application does not start after 10.1 to 10.2 update if topics need to be auto-created

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

thanks for reporting this issue. We are aware of a bug in 0.10.2 that
seems to be related: https://issues.apache.org/jira/browse/KAFKA-5037

However, I also want to point out, that it is highly recommended to not
use auto topic create for Streams, but to manually create all
input/output topics before you start your Streams application.

For more details, see
http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application


May I ask, why your are using topic auto create?


-Matthias


On 4/11/17 1:09 PM, Dmitry Minkovsky wrote:
> I updated from 10.1 and 10.2. I updated both the broker and maven
> dependency.
> 
> I am using topic auto-create. With 10.1, starting the application with a
> broker would sometimes result in an error like:
> 
>> Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
> building: stream-thread [StreamThread-1] Topic not found: $topic
> 
> But this would only happen once. Upon the second attempt, the topics are
> already created and everything works fine.
> 
> But with 10.2 this error does not go away. I have confirmed and tested that
> auto topic creation is enabled.
> 
> Here is the error/trace:
> 
> 
> Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
> building: stream-thread [StreamThread-1] Topic not found: session-updates
> at
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:734)
> at
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:648)
> at
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:368)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:339)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:488)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:745)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:334)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> 
> 
> It does not occur if my topology only defines streams and tables. However,
> when I attempt to join a stream and a table, this error is thrown:
> 
>         // No error if this is in topology
>         KTable<K, V> sessions = topology.table(byteStringSerde,
> sessionSerde, "sessions", "sessions");
> 
>         // No error if this is in topology
>         KStream<ByteString, Messages.EntityUpdate> sessionUpdates =
> topology.stream(byteStringSerde, sessionUpdateSerde, "session-updates");
> 
>         // Error if this is in topology
>         sessionUpdates
>           .leftJoin(sessions, (update, value) -> {
>               // do update, omitted
>           })
>           .filter((k, v) -> v != null)
>           .to(byteStringSerde, sessionSerde, "sessions");
>