You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Greg Fodor (JIRA)" <ji...@apache.org> on 2016/04/12 05:04:25 UTC

[jira] [Comment Edited] (KAFKA-3544) Missing topics on startup

    [ https://issues.apache.org/jira/browse/KAFKA-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15236512#comment-15236512 ] 

Greg Fodor edited comment on KAFKA-3544 at 4/12/16 3:04 AM:
------------------------------------------------------------

Not sure of the best way to share the topology. Here's the relevant part of the code:
{code}
				builder
			.stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming")
			.map((k, v) -> KeyValue.pair(v.getUserId(), v))
			.to(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id");

		KStream<String, RoomOperationMessage> roomOperationMessagesByUserId = builder
			.stream(Serdes.String(), roomOperationSerde, "room_operation_message_incoming-user_id");

		KStream<String, UserBroadcastsMessage> userBroadcastsMessagesByUserId =
			roomOperationMessagesByUserId.leftJoin(userSpaceBroadcastsByUserId, UserBroadcastsMessage::new);

{code}

In this example roomOperationSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating another KStream off of that output for a join downstream.

The topology is failing to build on the user_space_broadcasts-user_id topic:

{code}
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
		        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)                                               [0/1952]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete [StreamThread-2]
Exception in thread "StreamThread-2" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
{code}


was (Author: gfodor):
Not sure of the best way to share the topology. Here's the relevant part of the code:
{code}
		builder
			.stream(Serdes.Long(), userSpaceBroadcastSerde, "positron-db-user_space_broadcasts")
			.map((id, broadcast) -> KeyValue.pair(broadcast.getUserId().toString(), broadcast))
			.to(Serdes.String(), userSpaceBroadcastSerde, "user_space_broadcasts-user_id");

		KTable<String, UserSpaceBroadcasts> userSpaceBroadcastsByUserId = builder
			.stream(Serdes.String(), userSpaceBroadcastSerde, "user_space_broadcasts-user_id")
			.aggregateByKey(...);
{code}

In this example userSpaceBroadcastSerde is a Serde for a custom avro type. I'm basically pivoting the first stream onto a foreign key and then creating a KTable off of that output by tapping it and then aggregating. (Given our discussions on other tickets there may be a way to simplify this, but I wanted to capture it as-is for this report.)

The topology is failing to build on the user_space_broadcasts-user_id topic:

{code}
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
		        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)                                               [0/1952]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
2016-04-12 02:57:36 StreamThread [INFO] Stream thread shutdown complete [StreamThread-2]
Exception in thread "StreamThread-2" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: room_operation_message_incoming-user_id
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:435)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:423)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:200)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:233)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:385)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$700(AbstractCoordinator.java:80)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:339)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:321)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:638)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:381)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:321)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:220)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:317)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:898)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:860)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
{code}

> Missing topics on startup
> -------------------------
>
>                 Key: KAFKA-3544
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3544
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Greg Fodor
>            Assignee: Guozhang Wang
>              Labels: semantics
>
> When running a relatively complex job with multiple tasks and state stores, on the first run I get errors due to some of the intermediate topics not existing. Subsequent runs work OK. My assumption is streams may be creating topics lazily, so if downstream tasks are initializing before their parents have had a chance to create their necessary topics then the children will attempt to start consuming from topics that do not exist yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)