You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Amit Karyekar <ak...@fanatics.com> on 2015/11/24 19:40:13 UTC

Producer property to set to enable async data transfer in kafka 8.2.2

Hi folks,

We are working with Kafka 8.2.2 and want to use producer.type as async for sending messages to broker.

In Kakfka 8.2.2, some new producer properties have been introduced. However, there is no new name for the property producer.type mentioned in the documentation.

We’ve the following configuration for Kafka producer:

    bootstrap.servers: localhost:9092

    serializer.class: org.apache.kafka.common.serialization.StringSerializer

    key.serializer: org.apache.kafka.common.serialization.StringSerializer

    value.serializer: org.apache.kafka.common.serialization.StringSerializer

    acks: 1

    producer.type: async

On sending messages, we are seeing following message in logs:

2015-11-23 18:18:50 TRACE KafkaProducer:158 - Starting the Kafka producer
2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata version 1 to Cluster(nodes = [Node(localhost, 9092)], partitions = [])
2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration producer.type = null was supplied but isn't a known config.
2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration serializer.class = null was supplied but isn't a known config.
2015-11-23 18:18:50 DEBUG KafkaProducer:231 - Kafka producer started
2015-11-23 18:18:50 DEBUG Sender:117 - Starting Kafka producer I/O thread.
2015-11-23 18:18:50 TRACE KafkaProducer:374 - Requesting metadata update for topic ArgosHeartbeat.
2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata request to node -1
2015-11-23 18:18:50 DEBUG NetworkClient:397 - Init connection to node -1 for sending metadata request in the next iteration
2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to node -1 at localhost:9092.
2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata request to node -1
2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to node -1
2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata request to node -1
2015-11-23 18:18:50 DEBUG NetworkClient:392 - Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-9}, body={topics=[ArgosHeartbeat]})) to node -1
2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata version 2 to Cluster(nodes = [Node(0, 192.168.99.1, 9092)], partitions = [Partition(topic = ArgosHeartbeat, partition = 2, leader = 0, replicas = [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition = 1, leader = 0, replicas = [0,], isr = [0,]])
2015-11-23 18:18:50 TRACE KafkaProducer:337 - Sending record ProducerRecord(topic=ArgosHeartbeat, partition=0, key=S109, value={"logType":"heartbeat","applicationType":"Java","StreamId":"S109","RequestId":"R109","ArgosTimestamp":1448331530502,"Timestamp":1447896190} with callback null to topic ArgosHeartbeat partition 0
2015-11-23 18:18:50 TRACE RecordAccumulator:156 - Allocating a new 16384 byte message buffer for topic ArgosHeartbeat partition 0
2015-11-23 18:18:50 TRACE KafkaProducer:340 - Waking up the sender since topic ArgosHeartbeat partition 0 is either full or getting a new batch
2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to node 0 at 192.168.99.1:9092.
2015-11-23 18:18:50 TRACE KafkaProducer:419 - Closing the Kafka producer.
2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to node 0
2015-11-23 18:18:50 DEBUG Sender:128 - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
2015-11-23 18:18:50 TRACE Sender:182 - Nodes with data ready to send: [Node(0, 192.168.99.1, 9092)]
2015-11-23 18:18:50 TRACE Sender:183 - Created 1 produce requests: [ClientRequest(expectResponse=true, payload={ArgosHeartbeat-0=RecordBatch(topicPartition=ArgosHeartbeat-0, recordCount=1)}, request=RequestSend(header={api_key=0,api_version=0,correlation_id=1,client_id=producer-9}, body={acks=1,timeout=30000,topic_data=[{topic=ArgosHeartbeat,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=169 cap=16384]}]}]}))]
2015-11-23 18:18:50 TRACE Sender:223 - Received produce response from node 0 with correlation id 1
2015-11-23 18:18:50 TRACE RecordBatch:81 - Produced messages to topic-partition ArgosHeartbeat-0 with base offset offset 41 and error: null.
2015-11-23 18:18:50 DEBUG Sender:143 - Shutdown of Kafka producer I/O thread has completed.
2015-11-23 18:18:50 DEBUG KafkaProducer:429 - The Kafka producer has closed.
2015-11-23 18:18:50 INFO  ProducerConfig:113 - ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
acks = 1
batch.size = 16384
reconnect.backoff.ms = 10
bootstrap.servers = [localhost:9092]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
retries = 0
max.request.size = 1048576
block.on.buffer.full = true
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms = 0
client.id =


In regards to above log,  we wanted to know how to set producer.type parameter to async in Kafka 8.2.2.
Also, it is unable to recognize serializer.class parameter which was a parameter in old Kafka producer config.

Regards,
Amit
Information contained in this e-mail message is confidential. This e-mail message is intended only for the personal use of the recipient(s) named above. If you are not an intended recipient, do not read, distribute or reproduce this transmission (including any attachments). If you have received this email in error, please immediately notify the sender by email reply and delete the original message.

Re: Producer property to set to enable async data transfer in kafka 8.2.2

Posted by Amit Karyekar <ak...@fanatics.com>.
Cool. Thanks Gwen for your response.

Regards,
Amit

On 11/24/15, 3:19 PM, "Gwen Shapira" <gw...@confluent.io> wrote:

>The new producer is async by default.
>
>You can see few examples of how to use it here:
>https://github.com/gwenshap/kafka-examples/tree/master/SimpleCounter/src/m
>ain/java/com/shapira/examples/producer/simplecounter
>
>On Tue, Nov 24, 2015 at 10:40 AM, Amit Karyekar <ak...@fanatics.com>
>wrote:
>
>> Hi folks,
>>
>> We are working with Kafka 8.2.2 and want to use producer.type as async
>>for
>> sending messages to broker.
>>
>> In Kakfka 8.2.2, some new producer properties have been introduced.
>> However, there is no new name for the property producer.type mentioned
>>in
>> the documentation.
>>
>> We¹ve the following configuration for Kafka producer:
>>
>>     bootstrap.servers: localhost:9092
>>
>>     serializer.class:
>> org.apache.kafka.common.serialization.StringSerializer
>>
>>     key.serializer:
>>org.apache.kafka.common.serialization.StringSerializer
>>
>>     value.serializer:
>> org.apache.kafka.common.serialization.StringSerializer
>>
>>     acks: 1
>>
>>     producer.type: async
>>
>> On sending messages, we are seeing following message in logs:
>>
>> 2015-11-23 18:18:50 TRACE KafkaProducer:158 - Starting the Kafka
>>producer
>> 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata
>>version
>> 1 to Cluster(nodes = [Node(localhost, 9092)], partitions = [])
>> 2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration
>> producer.type = null was supplied but isn't a known config.
>> 2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration
>> serializer.class = null was supplied but isn't a known config.
>> 2015-11-23 18:18:50 DEBUG KafkaProducer:231 - Kafka producer started
>> 2015-11-23 18:18:50 DEBUG Sender:117 - Starting Kafka producer I/O
>>thread.
>> 2015-11-23 18:18:50 TRACE KafkaProducer:374 - Requesting metadata update
>> for topic ArgosHeartbeat.
>> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
>> request to node -1
>> 2015-11-23 18:18:50 DEBUG NetworkClient:397 - Init connection to node -1
>> for sending metadata request in the next iteration
>> 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to
>> node -1 at localhost:9092.
>> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
>> request to node -1
>> 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to
>>node
>> -1
>> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
>> request to node -1
>> 2015-11-23 18:18:50 DEBUG NetworkClient:392 - Sending metadata request
>> ClientRequest(expectResponse=true, payload=null,
>>
>>request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,clie
>>nt_id=producer-9},
>> body={topics=[ArgosHeartbeat]})) to node -1
>> 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata
>>version
>> 2 to Cluster(nodes = [Node(0, 192.168.99.1, 9092)], partitions =
>> [Partition(topic = ArgosHeartbeat, partition = 2, leader = 0, replicas =
>> [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition = 0,
>>leader =
>> 0, replicas = [0,], isr = [0,], Partition(topic = ArgosHeartbeat,
>>partition
>> = 1, leader = 0, replicas = [0,], isr = [0,]])
>> 2015-11-23 18:18:50 TRACE KafkaProducer:337 - Sending record
>> ProducerRecord(topic=ArgosHeartbeat, partition=0, key=S109,
>>
>>value={"logType":"heartbeat","applicationType":"Java","StreamId":"S109","
>>RequestId":"R109","ArgosTimestamp":1448331530502,"Timestamp":1447896190}
>> with callback null to topic ArgosHeartbeat partition 0
>> 2015-11-23 18:18:50 TRACE RecordAccumulator:156 - Allocating a new 16384
>> byte message buffer for topic ArgosHeartbeat partition 0
>> 2015-11-23 18:18:50 TRACE KafkaProducer:340 - Waking up the sender since
>> topic ArgosHeartbeat partition 0 is either full or getting a new batch
>> 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to
>> node 0 at 192.168.99.1:9092.
>> 2015-11-23 18:18:50 TRACE KafkaProducer:419 - Closing the Kafka
>>producer.
>> 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to
>>node
>> 0
>> 2015-11-23 18:18:50 DEBUG Sender:128 - Beginning shutdown of Kafka
>> producer I/O thread, sending remaining records.
>> 2015-11-23 18:18:50 TRACE Sender:182 - Nodes with data ready to send:
>> [Node(0, 192.168.99.1, 9092)]
>> 2015-11-23 18:18:50 TRACE Sender:183 - Created 1 produce requests:
>> [ClientRequest(expectResponse=true,
>> payload={ArgosHeartbeat-0=RecordBatch(topicPartition=ArgosHeartbeat-0,
>> recordCount=1)},
>>
>>request=RequestSend(header={api_key=0,api_version=0,correlation_id=1,clie
>>nt_id=producer-9},
>>
>>body={acks=1,timeout=30000,topic_data=[{topic=ArgosHeartbeat,data=[{parti
>>tion=0,record_set=java.nio.HeapByteBuffer[pos=0
>> lim=169 cap=16384]}]}]}))]
>> 2015-11-23 18:18:50 TRACE Sender:223 - Received produce response from
>>node
>> 0 with correlation id 1
>> 2015-11-23 18:18:50 TRACE RecordBatch:81 - Produced messages to
>> topic-partition ArgosHeartbeat-0 with base offset offset 41 and error:
>>null.
>> 2015-11-23 18:18:50 DEBUG Sender:143 - Shutdown of Kafka producer I/O
>> thread has completed.
>> 2015-11-23 18:18:50 DEBUG KafkaProducer:429 - The Kafka producer has
>> closed.
>> 2015-11-23 18:18:50 INFO  ProducerConfig:113 - ProducerConfig values:
>> compression.type = none
>> metric.reporters = []
>> metadata.max.age.ms = 300000
>> metadata.fetch.timeout.ms = 60000
>> acks = 1
>> batch.size = 16384
>> reconnect.backoff.ms = 10
>> bootstrap.servers = [localhost:9092]
>> receive.buffer.bytes = 32768
>> retry.backoff.ms = 100
>> buffer.memory = 33554432
>> timeout.ms = 30000
>> key.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>> retries = 0
>> max.request.size = 1048576
>> block.on.buffer.full = true
>> value.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>> metrics.sample.window.ms = 30000
>> send.buffer.bytes = 131072
>> max.in.flight.requests.per.connection = 5
>> metrics.num.samples = 2
>> linger.ms = 0
>> client.id =
>>
>>
>> In regards to above log,  we wanted to know how to set producer.type
>> parameter to async in Kafka 8.2.2.
>> Also, it is unable to recognize serializer.class parameter which was a
>> parameter in old Kafka producer config.
>>
>> Regards,
>> Amit
>> Information contained in this e-mail message is confidential. This
>>e-mail
>> message is intended only for the personal use of the recipient(s) named
>> above. If you are not an intended recipient, do not read, distribute or
>> reproduce this transmission (including any attachments). If you have
>> received this email in error, please immediately notify the sender by
>>email
>> reply and delete the original message.
>>

Information contained in this e-mail message is confidential. This e-mail message is intended only for the personal use of the recipient(s) named above. If you are not an intended recipient, do not read, distribute or reproduce this transmission (including any attachments). If you have received this email in error, please immediately notify the sender by email reply and delete the original message.

Re: Producer property to set to enable async data transfer in kafka 8.2.2

Posted by Gwen Shapira <gw...@confluent.io>.
The new producer is async by default.

You can see few examples of how to use it here:
https://github.com/gwenshap/kafka-examples/tree/master/SimpleCounter/src/main/java/com/shapira/examples/producer/simplecounter

On Tue, Nov 24, 2015 at 10:40 AM, Amit Karyekar <ak...@fanatics.com>
wrote:

> Hi folks,
>
> We are working with Kafka 8.2.2 and want to use producer.type as async for
> sending messages to broker.
>
> In Kakfka 8.2.2, some new producer properties have been introduced.
> However, there is no new name for the property producer.type mentioned in
> the documentation.
>
> We’ve the following configuration for Kafka producer:
>
>     bootstrap.servers: localhost:9092
>
>     serializer.class:
> org.apache.kafka.common.serialization.StringSerializer
>
>     key.serializer: org.apache.kafka.common.serialization.StringSerializer
>
>     value.serializer:
> org.apache.kafka.common.serialization.StringSerializer
>
>     acks: 1
>
>     producer.type: async
>
> On sending messages, we are seeing following message in logs:
>
> 2015-11-23 18:18:50 TRACE KafkaProducer:158 - Starting the Kafka producer
> 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata version
> 1 to Cluster(nodes = [Node(localhost, 9092)], partitions = [])
> 2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration
> producer.type = null was supplied but isn't a known config.
> 2015-11-23 18:18:50 WARN  ProducerConfig:121 - The configuration
> serializer.class = null was supplied but isn't a known config.
> 2015-11-23 18:18:50 DEBUG KafkaProducer:231 - Kafka producer started
> 2015-11-23 18:18:50 DEBUG Sender:117 - Starting Kafka producer I/O thread.
> 2015-11-23 18:18:50 TRACE KafkaProducer:374 - Requesting metadata update
> for topic ArgosHeartbeat.
> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
> request to node -1
> 2015-11-23 18:18:50 DEBUG NetworkClient:397 - Init connection to node -1
> for sending metadata request in the next iteration
> 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to
> node -1 at localhost:9092.
> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
> request to node -1
> 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to node
> -1
> 2015-11-23 18:18:50 DEBUG NetworkClient:387 - Trying to send metadata
> request to node -1
> 2015-11-23 18:18:50 DEBUG NetworkClient:392 - Sending metadata request
> ClientRequest(expectResponse=true, payload=null,
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-9},
> body={topics=[ArgosHeartbeat]})) to node -1
> 2015-11-23 18:18:50 DEBUG Metadata:141 - Updated cluster metadata version
> 2 to Cluster(nodes = [Node(0, 192.168.99.1, 9092)], partitions =
> [Partition(topic = ArgosHeartbeat, partition = 2, leader = 0, replicas =
> [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition = 0, leader =
> 0, replicas = [0,], isr = [0,], Partition(topic = ArgosHeartbeat, partition
> = 1, leader = 0, replicas = [0,], isr = [0,]])
> 2015-11-23 18:18:50 TRACE KafkaProducer:337 - Sending record
> ProducerRecord(topic=ArgosHeartbeat, partition=0, key=S109,
> value={"logType":"heartbeat","applicationType":"Java","StreamId":"S109","RequestId":"R109","ArgosTimestamp":1448331530502,"Timestamp":1447896190}
> with callback null to topic ArgosHeartbeat partition 0
> 2015-11-23 18:18:50 TRACE RecordAccumulator:156 - Allocating a new 16384
> byte message buffer for topic ArgosHeartbeat partition 0
> 2015-11-23 18:18:50 TRACE KafkaProducer:340 - Waking up the sender since
> topic ArgosHeartbeat partition 0 is either full or getting a new batch
> 2015-11-23 18:18:50 DEBUG NetworkClient:415 - Initiating connection to
> node 0 at 192.168.99.1:9092.
> 2015-11-23 18:18:50 TRACE KafkaProducer:419 - Closing the Kafka producer.
> 2015-11-23 18:18:50 DEBUG NetworkClient:348 - Completed connection to node
> 0
> 2015-11-23 18:18:50 DEBUG Sender:128 - Beginning shutdown of Kafka
> producer I/O thread, sending remaining records.
> 2015-11-23 18:18:50 TRACE Sender:182 - Nodes with data ready to send:
> [Node(0, 192.168.99.1, 9092)]
> 2015-11-23 18:18:50 TRACE Sender:183 - Created 1 produce requests:
> [ClientRequest(expectResponse=true,
> payload={ArgosHeartbeat-0=RecordBatch(topicPartition=ArgosHeartbeat-0,
> recordCount=1)},
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1,client_id=producer-9},
> body={acks=1,timeout=30000,topic_data=[{topic=ArgosHeartbeat,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
> lim=169 cap=16384]}]}]}))]
> 2015-11-23 18:18:50 TRACE Sender:223 - Received produce response from node
> 0 with correlation id 1
> 2015-11-23 18:18:50 TRACE RecordBatch:81 - Produced messages to
> topic-partition ArgosHeartbeat-0 with base offset offset 41 and error: null.
> 2015-11-23 18:18:50 DEBUG Sender:143 - Shutdown of Kafka producer I/O
> thread has completed.
> 2015-11-23 18:18:50 DEBUG KafkaProducer:429 - The Kafka producer has
> closed.
> 2015-11-23 18:18:50 INFO  ProducerConfig:113 - ProducerConfig values:
> compression.type = none
> metric.reporters = []
> metadata.max.age.ms = 300000
> metadata.fetch.timeout.ms = 60000
> acks = 1
> batch.size = 16384
> reconnect.backoff.ms = 10
> bootstrap.servers = [localhost:9092]
> receive.buffer.bytes = 32768
> retry.backoff.ms = 100
> buffer.memory = 33554432
> timeout.ms = 30000
> key.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
> retries = 0
> max.request.size = 1048576
> block.on.buffer.full = true
> value.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
> metrics.sample.window.ms = 30000
> send.buffer.bytes = 131072
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> linger.ms = 0
> client.id =
>
>
> In regards to above log,  we wanted to know how to set producer.type
> parameter to async in Kafka 8.2.2.
> Also, it is unable to recognize serializer.class parameter which was a
> parameter in old Kafka producer config.
>
> Regards,
> Amit
> Information contained in this e-mail message is confidential. This e-mail
> message is intended only for the personal use of the recipient(s) named
> above. If you are not an intended recipient, do not read, distribute or
> reproduce this transmission (including any attachments). If you have
> received this email in error, please immediately notify the sender by email
> reply and delete the original message.
>