You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dmitry Minkovsky <dm...@gmail.com> on 2017/04/30 23:37:41 UTC

Producer does not fetch metadata when attempting send to topic

I am attempting to send messages to two topics with a newly created
producer.

The first message sends fine, but for some reason, the producer does not
fetch metadata for the second topic before attempting to send. So sending
to the second topic fails. The producer fetches metadata for the second
topic only after it fails sending to it for the the first time.

Here is my test (Groovy/Java 8):


    // Helper function

    def send(KafkaProducer producer, ProducerRecord record) {
        CompletableFuture<Void> future = new CompletableFuture<>()
        producer.send(record, { meta, e ->
            if (e != null) {
                println "send failed exceptionally ${e.message}"
                future.completeExceptionally(e)
            } else {
                println "send succeeded"
                future.complete(null)
            }
        })
        future
    }


    // The test

    def "test"() {

        Properties config = new Properties();
        config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
'localhost:9092');
        config.put(ProducerConfig.ACKS_CONFIG, 'all');
        config.put(ProducerConfig.RETRIES_CONFIG, 0);

        def producer = new KafkaProducer(config, new ByteArraySerializer(),
new ByteArraySerializer())

        def topic1 = 'topic-1'
        def topic2 = 'topic-2'
        def key1 = 'key-1'.bytes
        def key2 = 'key-2'.bytes
        def message1 = 'some bytes'.bytes
        def message2 = 'more bytes'.bytes

        println "==== SENDING 1 ===="
        try {
            send(producer, new ProducerRecord(topic1, key1, message1))
              .thenCompose({
                println "==== SENDING 2 ===="
                send(producer, new ProducerRecord(topic2, key2, message2))
              })
              .toCompletableFuture().get()
        }
        catch (Exception e) {
            // The above try clause throws. Catch here so that the sleep
below happens
            // During sleep, metadata for `topic-2` is retrieved.
Subsequent attempts
            // to send to `topic-2` will succeed. But the first attempt
fails.
        }

        println 'before sleeping'
        sleep 5000
        println 'after sleeping'


        expect:
        true
    }


Result:

[2017-04-30 19:21:08,977] INFO
(org.apache.kafka.clients.producer.ProducerConfig:180) ProducerConfig
values:
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

[2017-04-30 19:21:09,192] INFO
(org.apache.kafka.common.utils.AppInfoParser:83) Kafka version : 0.10.2.1
[2017-04-30 19:21:09,193] INFO
(org.apache.kafka.common.utils.AppInfoParser:84) Kafka commitId :
e89bffd6b2eff799
==== SENDING 1 ====
[2017-04-30 19:21:09,358] DEBUG
(org.apache.kafka.clients.NetworkClient:767) Initialize connection to node
-1 for sending metadata request
[2017-04-30 19:21:09,358] DEBUG
(org.apache.kafka.clients.NetworkClient:627) Initiating connection to node
-1 at localhost:9092.
[2017-04-30 19:21:09,694] DEBUG
(org.apache.kafka.clients.NetworkClient:590) Completed connection to node
-1.  Fetching API versions.
[2017-04-30 19:21:09,695] DEBUG
(org.apache.kafka.clients.NetworkClient:603) Initiating API versions fetch
from node -1.
[2017-04-30 19:21:09,833] DEBUG
(org.apache.kafka.clients.NetworkClient:558) Recorded API versions for node
-1: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3],
Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2],
LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0],
UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable:
1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2 [usable:
2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1 [usable: 1],
Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14):
0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0
[usable: 0], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable:
0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0])
[2017-04-30 19:21:09,834] DEBUG
(org.apache.kafka.clients.NetworkClient:751) Sending metadata request
(type=MetadataRequest, topics=topic-1) to node -1
[2017-04-30 19:21:09,962] DEBUG
(org.apache.kafka.clients.NetworkClient:627) Initiating connection to node
0 at 127.0.0.1:9092.
[2017-04-30 19:21:09,974] DEBUG
(org.apache.kafka.clients.NetworkClient:590) Completed connection to node
0.  Fetching API versions.
[2017-04-30 19:21:09,974] DEBUG
(org.apache.kafka.clients.NetworkClient:603) Initiating API versions fetch
from node 0.
[2017-04-30 19:21:09,977] DEBUG
(org.apache.kafka.clients.NetworkClient:558) Recorded API versions for node
0: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3],
Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2],
LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0],
UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable:
1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2 [usable:
2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1 [usable: 1],
Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14):
0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0
[usable: 0], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable:
0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0])
send succeeded
==== SENDING 2 ====
send failed exceptionally Failed to update metadata after 5000 ms.
before sleeping
[2017-04-30 19:21:15,026] DEBUG
(org.apache.kafka.clients.NetworkClient:751) Sending metadata request
(type=MetadataRequest, topics=topic-1,topic-2) to node 0
after sleeping

Re: Producer does not fetch metadata when attempting send to topic

Posted by Dmitry Minkovsky <dm...@gmail.com>.
I figured it out :).

After playing with various sync/async variants of the above example, it's
quite simple: I needed to get off the callback thread in
`Producer#send(topic, callback)` before making the second send call. To do
this, I just changed `thenCompose()` to `thenComposeAsync()`. Probably
should have been doing this anyway, and probably with an executor service,
but that's another story :).

Hooray!

On Sun, Apr 30, 2017 at 7:37 PM, Dmitry Minkovsky <dm...@gmail.com>
wrote:

> I am attempting to send messages to two topics with a newly created
> producer.
>
> The first message sends fine, but for some reason, the producer does not
> fetch metadata for the second topic before attempting to send. So sending
> to the second topic fails. The producer fetches metadata for the second
> topic only after it fails sending to it for the the first time.
>
> Here is my test (Groovy/Java 8):
>
>
>     // Helper function
>
>     def send(KafkaProducer producer, ProducerRecord record) {
>         CompletableFuture<Void> future = new CompletableFuture<>()
>         producer.send(record, { meta, e ->
>             if (e != null) {
>                 println "send failed exceptionally ${e.message}"
>                 future.completeExceptionally(e)
>             } else {
>                 println "send succeeded"
>                 future.complete(null)
>             }
>         })
>         future
>     }
>
>
>     // The test
>
>     def "test"() {
>
>         Properties config = new Properties();
>         config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
>         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> 'localhost:9092');
>         config.put(ProducerConfig.ACKS_CONFIG, 'all');
>         config.put(ProducerConfig.RETRIES_CONFIG, 0);
>
>         def producer = new KafkaProducer(config, new
> ByteArraySerializer(), new ByteArraySerializer())
>
>         def topic1 = 'topic-1'
>         def topic2 = 'topic-2'
>         def key1 = 'key-1'.bytes
>         def key2 = 'key-2'.bytes
>         def message1 = 'some bytes'.bytes
>         def message2 = 'more bytes'.bytes
>
>         println "==== SENDING 1 ===="
>         try {
>             send(producer, new ProducerRecord(topic1, key1, message1))
>               .thenCompose({
>                 println "==== SENDING 2 ===="
>                 send(producer, new ProducerRecord(topic2, key2, message2))
>               })
>               .toCompletableFuture().get()
>         }
>         catch (Exception e) {
>             // The above try clause throws. Catch here so that the sleep
> below happens
>             // During sleep, metadata for `topic-2` is retrieved.
> Subsequent attempts
>             // to send to `topic-2` will succeed. But the first attempt
> fails.
>         }
>
>         println 'before sleeping'
>         sleep 5000
>         println 'after sleeping'
>
>
>         expect:
>         true
>     }
>
>
> Result:
>
> [2017-04-30 19:21:08,977] INFO (org.apache.kafka.clients.producer.ProducerConfig:180)
> ProducerConfig values:
> acks = all
> batch.size = 16384
> block.on.buffer.full = false
> bootstrap.servers = [localhost:9092]
> buffer.memory = 33554432
> client.id =
> compression.type = none
> connections.max.idle.ms = 540000
> interceptor.classes = null
> key.serializer = class org.apache.kafka.common.serialization.
> ByteArraySerializer
> linger.ms = 0
> max.block.ms = 5000
> max.in.flight.requests.per.connection = 5
> max.request.size = 1048576
> metadata.fetch.timeout.ms = 60000
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 30000
> partitioner.class = class org.apache.kafka.clients.producer.internals.
> DefaultPartitioner
> receive.buffer.bytes = 32768
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retries = 0
> retry.backoff.ms = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> timeout.ms = 30000
> value.serializer = class org.apache.kafka.common.serialization.
> ByteArraySerializer
>
> [2017-04-30 19:21:09,192] INFO (org.apache.kafka.common.utils.AppInfoParser:83)
> Kafka version : 0.10.2.1
> [2017-04-30 19:21:09,193] INFO (org.apache.kafka.common.utils.AppInfoParser:84)
> Kafka commitId : e89bffd6b2eff799
> ==== SENDING 1 ====
> [2017-04-30 19:21:09,358] DEBUG (org.apache.kafka.clients.NetworkClient:767)
> Initialize connection to node -1 for sending metadata request
> [2017-04-30 19:21:09,358] DEBUG (org.apache.kafka.clients.NetworkClient:627)
> Initiating connection to node -1 at localhost:9092.
> [2017-04-30 19:21:09,694] DEBUG (org.apache.kafka.clients.NetworkClient:590)
> Completed connection to node -1.  Fetching API versions.
> [2017-04-30 19:21:09,695] DEBUG (org.apache.kafka.clients.NetworkClient:603)
> Initiating API versions fetch from node -1.
> [2017-04-30 19:21:09,833] DEBUG (org.apache.kafka.clients.NetworkClient:558)
> Recorded API versions for node -1: (Produce(0): 0 to 2 [usable: 2],
> Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3):
> 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0
> [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7):
> 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2
> [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1
> [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0],
> SyncGroup(14): 0 [usable: 0], DescribeGroups(15): 0 [usable: 0],
> ListGroups(16): 0 [usable: 0], SaslHandshake(17): 0 [usable: 0],
> ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1],
> DeleteTopics(20): 0 [usable: 0])
> [2017-04-30 19:21:09,834] DEBUG (org.apache.kafka.clients.NetworkClient:751)
> Sending metadata request (type=MetadataRequest, topics=topic-1) to node -1
> [2017-04-30 19:21:09,962] DEBUG (org.apache.kafka.clients.NetworkClient:627)
> Initiating connection to node 0 at 127.0.0.1:9092.
> [2017-04-30 19:21:09,974] DEBUG (org.apache.kafka.clients.NetworkClient:590)
> Completed connection to node 0.  Fetching API versions.
> [2017-04-30 19:21:09,974] DEBUG (org.apache.kafka.clients.NetworkClient:603)
> Initiating API versions fetch from node 0.
> [2017-04-30 19:21:09,977] DEBUG (org.apache.kafka.clients.NetworkClient:558)
> Recorded API versions for node 0: (Produce(0): 0 to 2 [usable: 2],
> Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3):
> 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0
> [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7):
> 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2
> [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1
> [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0],
> SyncGroup(14): 0 [usable: 0], DescribeGroups(15): 0 [usable: 0],
> ListGroups(16): 0 [usable: 0], SaslHandshake(17): 0 [usable: 0],
> ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1],
> DeleteTopics(20): 0 [usable: 0])
> send succeeded
> ==== SENDING 2 ====
> send failed exceptionally Failed to update metadata after 5000 ms.
> before sleeping
> [2017-04-30 19:21:15,026] DEBUG (org.apache.kafka.clients.NetworkClient:751)
> Sending metadata request (type=MetadataRequest, topics=topic-1,topic-2) to
> node 0
> after sleeping
>
>
>