You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Artem Livshits (Jira)" <ji...@apache.org> on 2022/07/19 17:02:00 UTC

[jira] [Updated] (KAFKA-14087) Add jmh benchmark for producer with MockClient

     [ https://issues.apache.org/jira/browse/KAFKA-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Artem Livshits updated KAFKA-14087:
-----------------------------------
    Description: 
Something like this
{code:java}
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
        Time time = Time.SYSTEM;
        AtomicInteger offset = new AtomicInteger(0);
        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2));
        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
        StringBuilder value = new StringBuilder("foo");
        for (int i = 0; i < 1000; i++)
            value.append("x");
        AtomicInteger totalRecords = new AtomicInteger(0);
        long start = time.milliseconds();
        CompletableFuture[] futures = new CompletableFuture[3];
        for (int i = 0; i < futures.length; i++) {
            futures[i] = CompletableFuture.runAsync(() -> {
                ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
                MockClient client = new MockClient(time, metadata) {
                    @Override
                    public void send(ClientRequest request, long now) {
                        super.send(request, now);
                        if (request.apiKey() == ApiKeys.PRODUCE) {
                            // Prepare response data from request.
                            ProduceResponseData responseData = new ProduceResponseData();                            ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build();
                            produceRequest.data().topicData().forEach(topicData ->
                                    topicData.partitionData().forEach(partitionData -> {
                                        String topic = topicData.name();
                                        ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic);
                                        if (tpr == null) {
                                            tpr = new ProduceResponseData.TopicProduceResponse().setName(topic);
                                            responseData.responses().add(tpr);
                                        }
                                        tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
                                                .setIndex(partitionData.index())
                                                .setRecordErrors(Collections.emptyList())
                                                .setBaseOffset(offset.addAndGet(1))
                                                .setLogAppendTimeMs(time.milliseconds())
                                                .setLogStartOffset(0)
                                                .setErrorMessage("")
                                                .setErrorCode(Errors.NONE.code()));
                                    }));                            // Schedule a reply to come after some time to mock broker latency.
                            executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
                        }
                    }
                };                client.updateMetadata(initialUpdateResponse);                InitProducerIdResponseData responseData = new InitProducerIdResponseData()
                        .setErrorCode(Errors.NONE.code())
                        .setProducerEpoch((short) 0)
                        .setProducerId(42)
                        .setThrottleTimeMs(0);
                client.prepareResponse(body -> body instanceof InitProducerIdRequest,
                        new InitProducerIdResponse(responseData), false);                try (KafkaProducer<String, String> producer = kafkaProducer(
                        configs,
                        new StringSerializer(),
                        new StringSerializer(),
                        metadata,
                        client,
                        null,
                        time
                )) {
                    final int records = 20_000_000;                    for (int k = 0; k < records; k++) {
                        producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString()));
                    }                    totalRecords.addAndGet(records);
                }
            });
        }        for (CompletableFuture future : futures) {
            future.get();
        } {code}
 

  was:
Something like this
{code:java}
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");        Time time = Time.SYSTEM;
        AtomicInteger offset = new AtomicInteger(0);        MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2));
        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);        StringBuilder value = new StringBuilder("foo");
        for (int i = 0; i < 1000; i++)
            value.append("x");        AtomicInteger totalRecords = new AtomicInteger(0);
        long start = time.milliseconds();        CompletableFuture[] futures = new CompletableFuture[3];
        for (int i = 0; i < futures.length; i++) {
            futures[i] = CompletableFuture.runAsync(() -> {
                ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
                MockClient client = new MockClient(time, metadata) {
                    @Override
                    public void send(ClientRequest request, long now) {
                        super.send(request, now);
                        if (request.apiKey() == ApiKeys.PRODUCE) {
                            // Prepare response data from request.
                            ProduceResponseData responseData = new ProduceResponseData();                            ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build();
                            produceRequest.data().topicData().forEach(topicData ->
                                    topicData.partitionData().forEach(partitionData -> {
                                        String topic = topicData.name();
                                        ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic);
                                        if (tpr == null) {
                                            tpr = new ProduceResponseData.TopicProduceResponse().setName(topic);
                                            responseData.responses().add(tpr);
                                        }
                                        tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
                                                .setIndex(partitionData.index())
                                                .setRecordErrors(Collections.emptyList())
                                                .setBaseOffset(offset.addAndGet(1))
                                                .setLogAppendTimeMs(time.milliseconds())
                                                .setLogStartOffset(0)
                                                .setErrorMessage("")
                                                .setErrorCode(Errors.NONE.code()));
                                    }));                            // Schedule a reply to come after some time to mock broker latency.
                            executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
                        }
                    }
                };                client.updateMetadata(initialUpdateResponse);                InitProducerIdResponseData responseData = new InitProducerIdResponseData()
                        .setErrorCode(Errors.NONE.code())
                        .setProducerEpoch((short) 0)
                        .setProducerId(42)
                        .setThrottleTimeMs(0);
                client.prepareResponse(body -> body instanceof InitProducerIdRequest,
                        new InitProducerIdResponse(responseData), false);                try (KafkaProducer<String, String> producer = kafkaProducer(
                        configs,
                        new StringSerializer(),
                        new StringSerializer(),
                        metadata,
                        client,
                        null,
                        time
                )) {
                    final int records = 20_000_000;                    for (int k = 0; k < records; k++) {
                        producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString()));
                    }                    totalRecords.addAndGet(records);
                }
            });
        }        for (CompletableFuture future : futures) {
            future.get();
        } {code}


> Add jmh benchmark for producer with MockClient
> ----------------------------------------------
>
>                 Key: KAFKA-14087
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14087
>             Project: Kafka
>          Issue Type: Improvement
>          Components: producer 
>            Reporter: Artem Livshits
>            Priority: Major
>
> Something like this
> {code:java}
>         Map<String, Object> configs = new HashMap<>();
>         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
>         configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
>         Time time = Time.SYSTEM;
>         AtomicInteger offset = new AtomicInteger(0);
>         MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2));
>         ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
>         StringBuilder value = new StringBuilder("foo");
>         for (int i = 0; i < 1000; i++)
>             value.append("x");
>         AtomicInteger totalRecords = new AtomicInteger(0);
>         long start = time.milliseconds();
>         CompletableFuture[] futures = new CompletableFuture[3];
>         for (int i = 0; i < futures.length; i++) {
>             futures[i] = CompletableFuture.runAsync(() -> {
>                 ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
>                 MockClient client = new MockClient(time, metadata) {
>                     @Override
>                     public void send(ClientRequest request, long now) {
>                         super.send(request, now);
>                         if (request.apiKey() == ApiKeys.PRODUCE) {
>                             // Prepare response data from request.
>                             ProduceResponseData responseData = new ProduceResponseData();                            ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build();
>                             produceRequest.data().topicData().forEach(topicData ->
>                                     topicData.partitionData().forEach(partitionData -> {
>                                         String topic = topicData.name();
>                                         ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic);
>                                         if (tpr == null) {
>                                             tpr = new ProduceResponseData.TopicProduceResponse().setName(topic);
>                                             responseData.responses().add(tpr);
>                                         }
>                                         tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
>                                                 .setIndex(partitionData.index())
>                                                 .setRecordErrors(Collections.emptyList())
>                                                 .setBaseOffset(offset.addAndGet(1))
>                                                 .setLogAppendTimeMs(time.milliseconds())
>                                                 .setLogStartOffset(0)
>                                                 .setErrorMessage("")
>                                                 .setErrorCode(Errors.NONE.code()));
>                                     }));                            // Schedule a reply to come after some time to mock broker latency.
>                             executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
>                         }
>                     }
>                 };                client.updateMetadata(initialUpdateResponse);                InitProducerIdResponseData responseData = new InitProducerIdResponseData()
>                         .setErrorCode(Errors.NONE.code())
>                         .setProducerEpoch((short) 0)
>                         .setProducerId(42)
>                         .setThrottleTimeMs(0);
>                 client.prepareResponse(body -> body instanceof InitProducerIdRequest,
>                         new InitProducerIdResponse(responseData), false);                try (KafkaProducer<String, String> producer = kafkaProducer(
>                         configs,
>                         new StringSerializer(),
>                         new StringSerializer(),
>                         metadata,
>                         client,
>                         null,
>                         time
>                 )) {
>                     final int records = 20_000_000;                    for (int k = 0; k < records; k++) {
>                         producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString()));
>                     }                    totalRecords.addAndGet(records);
>                 }
>             });
>         }        for (CompletableFuture future : futures) {
>             future.get();
>         } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)