You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Artem Livshits (Jira)" <ji...@apache.org> on 2022/07/19 17:02:00 UTC
[jira] [Updated] (KAFKA-14087) Add jmh benchmark for producer with MockClient
[ https://issues.apache.org/jira/browse/KAFKA-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Artem Livshits updated KAFKA-14087:
-----------------------------------
Description:
Something like this
{code:java}
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
Time time = Time.SYSTEM;
AtomicInteger offset = new AtomicInteger(0);
MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2));
ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
StringBuilder value = new StringBuilder("foo");
for (int i = 0; i < 1000; i++)
value.append("x");
AtomicInteger totalRecords = new AtomicInteger(0);
long start = time.milliseconds();
CompletableFuture[] futures = new CompletableFuture[3];
for (int i = 0; i < futures.length; i++) {
futures[i] = CompletableFuture.runAsync(() -> {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
MockClient client = new MockClient(time, metadata) {
@Override
public void send(ClientRequest request, long now) {
super.send(request, now);
if (request.apiKey() == ApiKeys.PRODUCE) {
// Prepare response data from request.
ProduceResponseData responseData = new ProduceResponseData(); ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build();
produceRequest.data().topicData().forEach(topicData ->
topicData.partitionData().forEach(partitionData -> {
String topic = topicData.name();
ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic);
if (tpr == null) {
tpr = new ProduceResponseData.TopicProduceResponse().setName(topic);
responseData.responses().add(tpr);
}
tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
.setIndex(partitionData.index())
.setRecordErrors(Collections.emptyList())
.setBaseOffset(offset.addAndGet(1))
.setLogAppendTimeMs(time.milliseconds())
.setLogStartOffset(0)
.setErrorMessage("")
.setErrorCode(Errors.NONE.code()));
})); // Schedule a reply to come after some time to mock broker latency.
executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
}
}
}; client.updateMetadata(initialUpdateResponse); InitProducerIdResponseData responseData = new InitProducerIdResponseData()
.setErrorCode(Errors.NONE.code())
.setProducerEpoch((short) 0)
.setProducerId(42)
.setThrottleTimeMs(0);
client.prepareResponse(body -> body instanceof InitProducerIdRequest,
new InitProducerIdResponse(responseData), false); try (KafkaProducer<String, String> producer = kafkaProducer(
configs,
new StringSerializer(),
new StringSerializer(),
metadata,
client,
null,
time
)) {
final int records = 20_000_000; for (int k = 0; k < records; k++) {
producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString()));
} totalRecords.addAndGet(records);
}
});
} for (CompletableFuture future : futures) {
future.get();
} {code}
was:
Something like this
{code:java}
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); Time time = Time.SYSTEM;
AtomicInteger offset = new AtomicInteger(0); MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2));
ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); StringBuilder value = new StringBuilder("foo");
for (int i = 0; i < 1000; i++)
value.append("x"); AtomicInteger totalRecords = new AtomicInteger(0);
long start = time.milliseconds(); CompletableFuture[] futures = new CompletableFuture[3];
for (int i = 0; i < futures.length; i++) {
futures[i] = CompletableFuture.runAsync(() -> {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
MockClient client = new MockClient(time, metadata) {
@Override
public void send(ClientRequest request, long now) {
super.send(request, now);
if (request.apiKey() == ApiKeys.PRODUCE) {
// Prepare response data from request.
ProduceResponseData responseData = new ProduceResponseData(); ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build();
produceRequest.data().topicData().forEach(topicData ->
topicData.partitionData().forEach(partitionData -> {
String topic = topicData.name();
ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic);
if (tpr == null) {
tpr = new ProduceResponseData.TopicProduceResponse().setName(topic);
responseData.responses().add(tpr);
}
tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
.setIndex(partitionData.index())
.setRecordErrors(Collections.emptyList())
.setBaseOffset(offset.addAndGet(1))
.setLogAppendTimeMs(time.milliseconds())
.setLogStartOffset(0)
.setErrorMessage("")
.setErrorCode(Errors.NONE.code()));
})); // Schedule a reply to come after some time to mock broker latency.
executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
}
}
}; client.updateMetadata(initialUpdateResponse); InitProducerIdResponseData responseData = new InitProducerIdResponseData()
.setErrorCode(Errors.NONE.code())
.setProducerEpoch((short) 0)
.setProducerId(42)
.setThrottleTimeMs(0);
client.prepareResponse(body -> body instanceof InitProducerIdRequest,
new InitProducerIdResponse(responseData), false); try (KafkaProducer<String, String> producer = kafkaProducer(
configs,
new StringSerializer(),
new StringSerializer(),
metadata,
client,
null,
time
)) {
final int records = 20_000_000; for (int k = 0; k < records; k++) {
producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString()));
} totalRecords.addAndGet(records);
}
});
} for (CompletableFuture future : futures) {
future.get();
} {code}
> Add jmh benchmark for producer with MockClient
> ----------------------------------------------
>
> Key: KAFKA-14087
> URL: https://issues.apache.org/jira/browse/KAFKA-14087
> Project: Kafka
> Issue Type: Improvement
> Components: producer
> Reporter: Artem Livshits
> Priority: Major
>
> Something like this
> {code:java}
> Map<String, Object> configs = new HashMap<>();
> configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
> configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
> Time time = Time.SYSTEM;
> AtomicInteger offset = new AtomicInteger(0);
> MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2));
> ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
> StringBuilder value = new StringBuilder("foo");
> for (int i = 0; i < 1000; i++)
> value.append("x");
> AtomicInteger totalRecords = new AtomicInteger(0);
> long start = time.milliseconds();
> CompletableFuture[] futures = new CompletableFuture[3];
> for (int i = 0; i < futures.length; i++) {
> futures[i] = CompletableFuture.runAsync(() -> {
> ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
> MockClient client = new MockClient(time, metadata) {
> @Override
> public void send(ClientRequest request, long now) {
> super.send(request, now);
> if (request.apiKey() == ApiKeys.PRODUCE) {
> // Prepare response data from request.
> ProduceResponseData responseData = new ProduceResponseData(); ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build();
> produceRequest.data().topicData().forEach(topicData ->
> topicData.partitionData().forEach(partitionData -> {
> String topic = topicData.name();
> ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic);
> if (tpr == null) {
> tpr = new ProduceResponseData.TopicProduceResponse().setName(topic);
> responseData.responses().add(tpr);
> }
> tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
> .setIndex(partitionData.index())
> .setRecordErrors(Collections.emptyList())
> .setBaseOffset(offset.addAndGet(1))
> .setLogAppendTimeMs(time.milliseconds())
> .setLogStartOffset(0)
> .setErrorMessage("")
> .setErrorCode(Errors.NONE.code()));
> })); // Schedule a reply to come after some time to mock broker latency.
> executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
> }
> }
> }; client.updateMetadata(initialUpdateResponse); InitProducerIdResponseData responseData = new InitProducerIdResponseData()
> .setErrorCode(Errors.NONE.code())
> .setProducerEpoch((short) 0)
> .setProducerId(42)
> .setThrottleTimeMs(0);
> client.prepareResponse(body -> body instanceof InitProducerIdRequest,
> new InitProducerIdResponse(responseData), false); try (KafkaProducer<String, String> producer = kafkaProducer(
> configs,
> new StringSerializer(),
> new StringSerializer(),
> metadata,
> client,
> null,
> time
> )) {
> final int records = 20_000_000; for (int k = 0; k < records; k++) {
> producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString()));
> } totalRecords.addAndGet(records);
> }
> });
> } for (CompletableFuture future : futures) {
> future.get();
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)