You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jaikiran Pai <ja...@gmail.com> on 2017/03/02 04:35:09 UTC

Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

For future reference - I asked this question on dev mailing list and 
based on the discussion there was able to come up with a workaround to 
get this working. Details here 
https://www.mail-archive.com/dev@kafka.apache.org/msg67613.html

-Jaikiran

On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote:
> We are on Kafka 0.10.0.1 (server and client) and use Java 
> consumer/producer APIs. We have an application where we create Kafka 
> topics dynamically (using the AdminUtils Java API) and then start 
> producing/consuming on those topics. The issue we frequently run into 
> is this:
>
> 1. Application process creates a topic "foo-bar" via 
> AdminUtils.createTopic. This is sucessfully completed.
> 2. Same application process then creates a consumer (using new Java 
> consumer API) on that foo-bar topic as a next step.
> 3. The consumer that gets created in step#2 however, doesn't seem to 
> be enrolled in consumer group for this topic because of this (notice 
> the last line in the log):
>
> 2017-02-21 00:58:43,359 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
> 2017-02-21 00:58:43,360 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to 
> topic(s): foo-bar
> 2017-02-21 00:58:43,543 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> Received group coordinator response 
> ClientResponse(receivedTimeMs=1487667523542, disconnected=false, 
> request=ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f, 
> request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, 
> body={group_id=my-app-group}), createdTimeMs=1487667523378, 
> sendTimeMs=1487667523529), 
> responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
> 2017-02-21 00:58:43,543 [Thread-6] INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for 
> group my-app-group.
> 2017-02-21 00:58:43,545 [Thread-6] INFO 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> Revoking previously assigned partitions [] for group my-app-group
> 2017-02-21 00:58:43,545 [Thread-6] INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> (Re-)joining group my-app-group
> 2017-02-21 00:58:43,548 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> Sending JoinGroup 
> ({group_id=my-app-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 
> lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: 
> null)
> 2017-02-21 00:58:43,548 [Thread-6] DEBUG 
> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
> node-2147483647.bytes-sent
> 2017-02-21 00:58:43,549 [Thread-6] DEBUG 
> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
> node-2147483647.bytes-received
> 2017-02-21 00:58:43,549 [Thread-6] DEBUG 
> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
> node-2147483647.latency
> 2017-02-21 00:58:43,552 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> Received successful join group response for group my-app-group: 
> {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0 
> lim=59 cap=59]}]}
> 2017-02-21 00:58:43,552 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
> Performing assignment for group my-app-group using strategy range with 
> subscriptions 
> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
> *2017-02-21 00:58:43,552 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor 
> - Skipping assignment for topic foo-bar since no metadata is available*
>
>
> 4. A few seconds later, a separate process, produces (via Java 
> producer API) on the foo-bar topic, some messages.
> 5. The consumer created in step#2 (although is waiting for messages) 
> on the foo-bar topic, _doesn't_ consume these messages.
> 6. *5 minutes later* the Kafka server triggers a consumer rebalance 
> which then successfully assigns partition(s) of this foo-bar topic to 
> consumer created in step#2 and the consumer start consuming these 
> messages.
>
> This 5 minute delay in consuming messages from this dynamically 
> created topic is what we want to avoid. Is there anyway I can 
> deterministically do/force creation of a dynamic topic and be assured 
> that upon completion of that call, I can create a consumer and start 
> consuming of that topic such that it can receive messages as soon as 
> the messages are produced on that topic, without having to wait for a 
> 5 minute delay (or whatever the rebalance configuration is)? In 
> essence, is there a way to ensure that the Kafka consumer does get the 
> topic metadata for a topic that was created successfully by the same 
> application, immediately?
>
>
> P.S: We have topic auto creation disabled, so this isn't really a auto 
> topic creation issue. In our case we are explicitly invoking the 
> create topic operation via the Java API.
>
> -Jaikiran


Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

Posted by Jaikiran Pai <ja...@gmail.com>.
Thank you for pointing me to that JIRA. It indeed is the same issue we 
discussed in this thread. I'll keep a watch on that JIRA for the code to 
be merged.

-Jaikiran

On Thursday 02 March 2017 07:11 PM, Rajini Sivaram wrote:
> This issue is being addressed in KAFKA-4631. See
> https://issues.apache.org/jira/browse/KAFKA-4631 and the discussion in the
> PR https://github.com/apache/kafka/pull/2622 for details.
>
> Regards,
>
> Rajini
>
> On Thu, Mar 2, 2017 at 4:35 AM, Jaikiran Pai <ja...@gmail.com>
> wrote:
>
>> For future reference - I asked this question on dev mailing list and based
>> on the discussion there was able to come up with a workaround to get this
>> working. Details here https://www.mail-archive.com/d
>> ev@kafka.apache.org/msg67613.html
>>
>> -Jaikiran
>>
>>
>> On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote:
>>
>>> We are on Kafka 0.10.0.1 (server and client) and use Java
>>> consumer/producer APIs. We have an application where we create Kafka topics
>>> dynamically (using the AdminUtils Java API) and then start
>>> producing/consuming on those topics. The issue we frequently run into is
>>> this:
>>>
>>> 1. Application process creates a topic "foo-bar" via
>>> AdminUtils.createTopic. This is sucessfully completed.
>>> 2. Same application process then creates a consumer (using new Java
>>> consumer API) on that foo-bar topic as a next step.
>>> 3. The consumer that gets created in step#2 however, doesn't seem to be
>>> enrolled in consumer group for this topic because of this (notice the last
>>> line in the log):
>>>
>>> 2017-02-21 00:58:43,359 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer
>>> - Kafka consumer created
>>> 2017-02-21 00:58:43,360 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer
>>> - Subscribed to topic(s): foo-bar
>>> 2017-02-21 00:58:43,543 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - Received group coordinator response
>>> ClientResponse(receivedTimeMs=1487667523542, disconnected=false,
>>> request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clie
>>> nts.consumer.internals.ConsumerNetworkClient$RequestFutureCo
>>> mpletionHandler@50aad50f, request=RequestSend(header={ap
>>> i_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
>>> body={group_id=my-app-group}), createdTimeMs=1487667523378,
>>> sendTimeMs=1487667523529), responseBody={error_code=0,coo
>>> rdinator={node_id=0,host=localhost,port=9092}})
>>> 2017-02-21 00:58:43,543 [Thread-6] INFO org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - Discovered coordinator
>>> localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
>>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>>> mer.internals.ConsumerCoordinator - Revoking previously assigned
>>> partitions [] for group my-app-group
>>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - (Re-)joining group my-app-group
>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - Sending JoinGroup
>>> ({group_id=my-app-group,session_timeout=30000,member_id=,
>>> protocol_type=consumer,group_protocols=[{protocol_name=
>>> range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]})
>>> to coordinator localhost:9092 (id: 2147483647 <(214)%20748-3647> rack:
>>> null)
>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>>> - Added sensor with name node-2147483647.bytes-sent
>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>>> - Added sensor with name node-2147483647.bytes-received
>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>>> - Added sensor with name node-2147483647.latency
>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - Received successful join group
>>> response for group my-app-group: {error_code=0,generation_id=1,
>>> group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe
>>> -87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-
>>> 87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-
>>> 402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
>>> lim=59 cap=59]}]}
>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.ConsumerCoordinator - Performing assignment for group
>>> my-app-group using strategy range with subscriptions
>>> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscriptio
>>> n(topics=[foo-bar])}
>>> *2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.AbstractPartitionAssignor - Skipping assignment for topic
>>> foo-bar since no metadata is available*
>>>
>>>
>>> 4. A few seconds later, a separate process, produces (via Java producer
>>> API) on the foo-bar topic, some messages.
>>> 5. The consumer created in step#2 (although is waiting for messages) on
>>> the foo-bar topic, _doesn't_ consume these messages.
>>> 6. *5 minutes later* the Kafka server triggers a consumer rebalance which
>>> then successfully assigns partition(s) of this foo-bar topic to consumer
>>> created in step#2 and the consumer start consuming these messages.
>>>
>>> This 5 minute delay in consuming messages from this dynamically created
>>> topic is what we want to avoid. Is there anyway I can deterministically
>>> do/force creation of a dynamic topic and be assured that upon completion of
>>> that call, I can create a consumer and start consuming of that topic such
>>> that it can receive messages as soon as the messages are produced on that
>>> topic, without having to wait for a 5 minute delay (or whatever the
>>> rebalance configuration is)? In essence, is there a way to ensure that the
>>> Kafka consumer does get the topic metadata for a topic that was created
>>> successfully by the same application, immediately?
>>>
>>>
>>> P.S: We have topic auto creation disabled, so this isn't really a auto
>>> topic creation issue. In our case we are explicitly invoking the create
>>> topic operation via the Java API.
>>>
>>> -Jaikiran
>>>
>>


Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

Posted by Jaikiran Pai <ja...@gmail.com>.
Thank you for pointing me to that JIRA. It indeed is the same issue we 
discussed in this thread. I'll keep a watch on that JIRA for the code to 
be merged.

-Jaikiran

On Thursday 02 March 2017 07:11 PM, Rajini Sivaram wrote:
> This issue is being addressed in KAFKA-4631. See
> https://issues.apache.org/jira/browse/KAFKA-4631 and the discussion in the
> PR https://github.com/apache/kafka/pull/2622 for details.
>
> Regards,
>
> Rajini
>
> On Thu, Mar 2, 2017 at 4:35 AM, Jaikiran Pai <ja...@gmail.com>
> wrote:
>
>> For future reference - I asked this question on dev mailing list and based
>> on the discussion there was able to come up with a workaround to get this
>> working. Details here https://www.mail-archive.com/d
>> ev@kafka.apache.org/msg67613.html
>>
>> -Jaikiran
>>
>>
>> On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote:
>>
>>> We are on Kafka 0.10.0.1 (server and client) and use Java
>>> consumer/producer APIs. We have an application where we create Kafka topics
>>> dynamically (using the AdminUtils Java API) and then start
>>> producing/consuming on those topics. The issue we frequently run into is
>>> this:
>>>
>>> 1. Application process creates a topic "foo-bar" via
>>> AdminUtils.createTopic. This is sucessfully completed.
>>> 2. Same application process then creates a consumer (using new Java
>>> consumer API) on that foo-bar topic as a next step.
>>> 3. The consumer that gets created in step#2 however, doesn't seem to be
>>> enrolled in consumer group for this topic because of this (notice the last
>>> line in the log):
>>>
>>> 2017-02-21 00:58:43,359 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer
>>> - Kafka consumer created
>>> 2017-02-21 00:58:43,360 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer
>>> - Subscribed to topic(s): foo-bar
>>> 2017-02-21 00:58:43,543 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - Received group coordinator response
>>> ClientResponse(receivedTimeMs=1487667523542, disconnected=false,
>>> request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clie
>>> nts.consumer.internals.ConsumerNetworkClient$RequestFutureCo
>>> mpletionHandler@50aad50f, request=RequestSend(header={ap
>>> i_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
>>> body={group_id=my-app-group}), createdTimeMs=1487667523378,
>>> sendTimeMs=1487667523529), responseBody={error_code=0,coo
>>> rdinator={node_id=0,host=localhost,port=9092}})
>>> 2017-02-21 00:58:43,543 [Thread-6] INFO org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - Discovered coordinator
>>> localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
>>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>>> mer.internals.ConsumerCoordinator - Revoking previously assigned
>>> partitions [] for group my-app-group
>>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - (Re-)joining group my-app-group
>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - Sending JoinGroup
>>> ({group_id=my-app-group,session_timeout=30000,member_id=,
>>> protocol_type=consumer,group_protocols=[{protocol_name=
>>> range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]})
>>> to coordinator localhost:9092 (id: 2147483647 <(214)%20748-3647> rack:
>>> null)
>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>>> - Added sensor with name node-2147483647.bytes-sent
>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>>> - Added sensor with name node-2147483647.bytes-received
>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>>> - Added sensor with name node-2147483647.latency
>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.AbstractCoordinator - Received successful join group
>>> response for group my-app-group: {error_code=0,generation_id=1,
>>> group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe
>>> -87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-
>>> 87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-
>>> 402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
>>> lim=59 cap=59]}]}
>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.ConsumerCoordinator - Performing assignment for group
>>> my-app-group using strategy range with subscriptions
>>> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscriptio
>>> n(topics=[foo-bar])}
>>> *2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>>> mer.internals.AbstractPartitionAssignor - Skipping assignment for topic
>>> foo-bar since no metadata is available*
>>>
>>>
>>> 4. A few seconds later, a separate process, produces (via Java producer
>>> API) on the foo-bar topic, some messages.
>>> 5. The consumer created in step#2 (although is waiting for messages) on
>>> the foo-bar topic, _doesn't_ consume these messages.
>>> 6. *5 minutes later* the Kafka server triggers a consumer rebalance which
>>> then successfully assigns partition(s) of this foo-bar topic to consumer
>>> created in step#2 and the consumer start consuming these messages.
>>>
>>> This 5 minute delay in consuming messages from this dynamically created
>>> topic is what we want to avoid. Is there anyway I can deterministically
>>> do/force creation of a dynamic topic and be assured that upon completion of
>>> that call, I can create a consumer and start consuming of that topic such
>>> that it can receive messages as soon as the messages are produced on that
>>> topic, without having to wait for a 5 minute delay (or whatever the
>>> rebalance configuration is)? In essence, is there a way to ensure that the
>>> Kafka consumer does get the topic metadata for a topic that was created
>>> successfully by the same application, immediately?
>>>
>>>
>>> P.S: We have topic auto creation disabled, so this isn't really a auto
>>> topic creation issue. In our case we are explicitly invoking the create
>>> topic operation via the Java API.
>>>
>>> -Jaikiran
>>>
>>


Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

Posted by Rajini Sivaram <ra...@gmail.com>.
This issue is being addressed in KAFKA-4631. See
https://issues.apache.org/jira/browse/KAFKA-4631 and the discussion in the
PR https://github.com/apache/kafka/pull/2622 for details.

Regards,

Rajini

On Thu, Mar 2, 2017 at 4:35 AM, Jaikiran Pai <ja...@gmail.com>
wrote:

> For future reference - I asked this question on dev mailing list and based
> on the discussion there was able to come up with a workaround to get this
> working. Details here https://www.mail-archive.com/d
> ev@kafka.apache.org/msg67613.html
>
> -Jaikiran
>
>
> On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote:
>
>> We are on Kafka 0.10.0.1 (server and client) and use Java
>> consumer/producer APIs. We have an application where we create Kafka topics
>> dynamically (using the AdminUtils Java API) and then start
>> producing/consuming on those topics. The issue we frequently run into is
>> this:
>>
>> 1. Application process creates a topic "foo-bar" via
>> AdminUtils.createTopic. This is sucessfully completed.
>> 2. Same application process then creates a consumer (using new Java
>> consumer API) on that foo-bar topic as a next step.
>> 3. The consumer that gets created in step#2 however, doesn't seem to be
>> enrolled in consumer group for this topic because of this (notice the last
>> line in the log):
>>
>> 2017-02-21 00:58:43,359 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer
>> - Kafka consumer created
>> 2017-02-21 00:58:43,360 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer
>> - Subscribed to topic(s): foo-bar
>> 2017-02-21 00:58:43,543 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Received group coordinator response
>> ClientResponse(receivedTimeMs=1487667523542, disconnected=false,
>> request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clie
>> nts.consumer.internals.ConsumerNetworkClient$RequestFutureCo
>> mpletionHandler@50aad50f, request=RequestSend(header={ap
>> i_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
>> body={group_id=my-app-group}), createdTimeMs=1487667523378,
>> sendTimeMs=1487667523529), responseBody={error_code=0,coo
>> rdinator={node_id=0,host=localhost,port=9092}})
>> 2017-02-21 00:58:43,543 [Thread-6] INFO org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Discovered coordinator
>> localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator - Revoking previously assigned
>> partitions [] for group my-app-group
>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - (Re-)joining group my-app-group
>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Sending JoinGroup
>> ({group_id=my-app-group,session_timeout=30000,member_id=,
>> protocol_type=consumer,group_protocols=[{protocol_name=
>> range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]})
>> to coordinator localhost:9092 (id: 2147483647 <(214)%20748-3647> rack:
>> null)
>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>> - Added sensor with name node-2147483647.bytes-sent
>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>> - Added sensor with name node-2147483647.bytes-received
>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>> - Added sensor with name node-2147483647.latency
>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Received successful join group
>> response for group my-app-group: {error_code=0,generation_id=1,
>> group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe
>> -87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-
>> 87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-
>> 402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
>> lim=59 cap=59]}]}
>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator - Performing assignment for group
>> my-app-group using strategy range with subscriptions
>> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscriptio
>> n(topics=[foo-bar])}
>> *2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractPartitionAssignor - Skipping assignment for topic
>> foo-bar since no metadata is available*
>>
>>
>> 4. A few seconds later, a separate process, produces (via Java producer
>> API) on the foo-bar topic, some messages.
>> 5. The consumer created in step#2 (although is waiting for messages) on
>> the foo-bar topic, _doesn't_ consume these messages.
>> 6. *5 minutes later* the Kafka server triggers a consumer rebalance which
>> then successfully assigns partition(s) of this foo-bar topic to consumer
>> created in step#2 and the consumer start consuming these messages.
>>
>> This 5 minute delay in consuming messages from this dynamically created
>> topic is what we want to avoid. Is there anyway I can deterministically
>> do/force creation of a dynamic topic and be assured that upon completion of
>> that call, I can create a consumer and start consuming of that topic such
>> that it can receive messages as soon as the messages are produced on that
>> topic, without having to wait for a 5 minute delay (or whatever the
>> rebalance configuration is)? In essence, is there a way to ensure that the
>> Kafka consumer does get the topic metadata for a topic that was created
>> successfully by the same application, immediately?
>>
>>
>> P.S: We have topic auto creation disabled, so this isn't really a auto
>> topic creation issue. In our case we are explicitly invoking the create
>> topic operation via the Java API.
>>
>> -Jaikiran
>>
>
>

Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

Posted by Rajini Sivaram <ra...@gmail.com>.
This issue is being addressed in KAFKA-4631. See
https://issues.apache.org/jira/browse/KAFKA-4631 and the discussion in the
PR https://github.com/apache/kafka/pull/2622 for details.

Regards,

Rajini

On Thu, Mar 2, 2017 at 4:35 AM, Jaikiran Pai <ja...@gmail.com>
wrote:

> For future reference - I asked this question on dev mailing list and based
> on the discussion there was able to come up with a workaround to get this
> working. Details here https://www.mail-archive.com/d
> ev@kafka.apache.org/msg67613.html
>
> -Jaikiran
>
>
> On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote:
>
>> We are on Kafka 0.10.0.1 (server and client) and use Java
>> consumer/producer APIs. We have an application where we create Kafka topics
>> dynamically (using the AdminUtils Java API) and then start
>> producing/consuming on those topics. The issue we frequently run into is
>> this:
>>
>> 1. Application process creates a topic "foo-bar" via
>> AdminUtils.createTopic. This is sucessfully completed.
>> 2. Same application process then creates a consumer (using new Java
>> consumer API) on that foo-bar topic as a next step.
>> 3. The consumer that gets created in step#2 however, doesn't seem to be
>> enrolled in consumer group for this topic because of this (notice the last
>> line in the log):
>>
>> 2017-02-21 00:58:43,359 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer
>> - Kafka consumer created
>> 2017-02-21 00:58:43,360 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer
>> - Subscribed to topic(s): foo-bar
>> 2017-02-21 00:58:43,543 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Received group coordinator response
>> ClientResponse(receivedTimeMs=1487667523542, disconnected=false,
>> request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clie
>> nts.consumer.internals.ConsumerNetworkClient$RequestFutureCo
>> mpletionHandler@50aad50f, request=RequestSend(header={ap
>> i_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
>> body={group_id=my-app-group}), createdTimeMs=1487667523378,
>> sendTimeMs=1487667523529), responseBody={error_code=0,coo
>> rdinator={node_id=0,host=localhost,port=9092}})
>> 2017-02-21 00:58:43,543 [Thread-6] INFO org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Discovered coordinator
>> localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator - Revoking previously assigned
>> partitions [] for group my-app-group
>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - (Re-)joining group my-app-group
>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Sending JoinGroup
>> ({group_id=my-app-group,session_timeout=30000,member_id=,
>> protocol_type=consumer,group_protocols=[{protocol_name=
>> range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]})
>> to coordinator localhost:9092 (id: 2147483647 <(214)%20748-3647> rack:
>> null)
>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>> - Added sensor with name node-2147483647.bytes-sent
>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>> - Added sensor with name node-2147483647.bytes-received
>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
>> - Added sensor with name node-2147483647.latency
>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Received successful join group
>> response for group my-app-group: {error_code=0,generation_id=1,
>> group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe
>> -87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-
>> 87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-
>> 402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
>> lim=59 cap=59]}]}
>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator - Performing assignment for group
>> my-app-group using strategy range with subscriptions
>> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscriptio
>> n(topics=[foo-bar])}
>> *2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractPartitionAssignor - Skipping assignment for topic
>> foo-bar since no metadata is available*
>>
>>
>> 4. A few seconds later, a separate process, produces (via Java producer
>> API) on the foo-bar topic, some messages.
>> 5. The consumer created in step#2 (although is waiting for messages) on
>> the foo-bar topic, _doesn't_ consume these messages.
>> 6. *5 minutes later* the Kafka server triggers a consumer rebalance which
>> then successfully assigns partition(s) of this foo-bar topic to consumer
>> created in step#2 and the consumer start consuming these messages.
>>
>> This 5 minute delay in consuming messages from this dynamically created
>> topic is what we want to avoid. Is there anyway I can deterministically
>> do/force creation of a dynamic topic and be assured that upon completion of
>> that call, I can create a consumer and start consuming of that topic such
>> that it can receive messages as soon as the messages are produced on that
>> topic, without having to wait for a 5 minute delay (or whatever the
>> rebalance configuration is)? In essence, is there a way to ensure that the
>> Kafka consumer does get the topic metadata for a topic that was created
>> successfully by the same application, immediately?
>>
>>
>> P.S: We have topic auto creation disabled, so this isn't really a auto
>> topic creation issue. In our case we are explicitly invoking the create
>> topic operation via the Java API.
>>
>> -Jaikiran
>>
>
>