You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/26 07:28:24 UTC

[GitHub] [pulsar] liangyepianzhou commented on a change in pull request #11933: [Transaction] Add transaction perf

liangyepianzhou commented on a change in pull request #11933:
URL: https://github.com/apache/pulsar/pull/11933#discussion_r716157771



##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -340,9 +344,123 @@ public static void main(String[] args) throws Exception {
         if (isNotBlank(arguments.listenerName)) {
             clientBuilder.listenerName(arguments.listenerName);
         }
-
         PulsarClient pulsarClient = clientBuilder.build();
 
+        AtomicReference<Transaction> atomicReference = buildTransaction(pulsarClient, arguments.isEnableTransaction,
+                arguments.transactionTimeout);
+
+        AtomicLong messageAckedCount = new AtomicLong();
+        Semaphore messageReceiveLimiter = new Semaphore(arguments.numMessagesPerTransaction);
+        Thread thread = Thread.currentThread();
+        MessageListener<ByteBuffer> listener = (consumer, msg) -> {
+                if(arguments.isEnableTransaction){
+                    try {
+                        messageReceiveLimiter.acquire();
+                    }catch (InterruptedException e){
+                        log.error("Got error: ", e);
+                    }
+                    }
+                if (arguments.testTime > 0) {
+                    if (System.nanoTime() > testEndTime) {
+                        log.info("------------------- DONE -----------------------");
+                        printAggregatedStats();
+                        PerfClientUtils.exit(0);
+                        thread.interrupt();
+                    }
+                }
+                if(arguments.totalNumTxn > 0) {
+                    if (totalEndTxnOpFailNum.sum() + totalEndTxnOpSuccessNum.sum() >= arguments.totalNumTxn) {
+                        log.info("------------------- DONE -----------------------");
+                        printAggregatedStats();
+                        PerfClientUtils.exit(0);
+                        thread.interrupt();
+                    }
+                }
+                messagesReceived.increment();
+                bytesReceived.add(msg.size());
+
+                totalMessagesReceived.increment();
+                totalBytesReceived.add(msg.size());
+
+                if (limiter != null) {
+                    limiter.acquire();
+                }
+
+                long latencyMillis = System.currentTimeMillis() - msg.getPublishTime();
+                if (latencyMillis >= 0) {
+                    recorder.recordValue(latencyMillis);
+                    cumulativeRecorder.recordValue(latencyMillis);
+                }
+                if (arguments.isEnableTransaction) {
+                    consumer.acknowledgeAsync(msg.getMessageId(), atomicReference.get()).thenRun(() -> {
+                        totalMessageAck.increment();
+                        messageAck.increment();
+                    }).exceptionally(throwable ->{
+                        log.error("Ack message {} failed with exception", msg, throwable);
+                        totalMessageAckFailed.increment();
+                        return null;
+                    });
+                } else {
+                    consumer.acknowledgeAsync(msg).thenRun(()->{
+                        totalMessageAck.increment();
+                        messageAck.increment();
+                    }
+                    ).exceptionally(throwable ->{
+                                log.error("Ack message {} failed with exception", msg, throwable);
+                                totalMessageAckFailed.increment();
+                                return null;
+                            }
+                    );
+                }
+                if(arguments.poolMessages) {
+                    msg.release();
+                }
+                if (arguments.isEnableTransaction

Review comment:
       It is not good to write the Ack of the message and the commit of the transaction together. In this way if is too far away from else, it feels that the segmentation is not very good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org