You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/20 17:50:00 UTC

[pulsar] branch master updated: Avoid some promise object allocations when writing on channel (#16113)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e604348c94a Avoid some promise object allocations when writing on channel (#16113)
e604348c94a is described below

commit e604348c94afe240f5f82ccace24346d8bfe0bdc
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 20 10:49:49 2022 -0700

    Avoid some promise object allocations when writing on channel (#16113)
---
 .../broker/service/PulsarCommandSenderImpl.java    | 48 +++++++++++-----------
 1 file changed, 24 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 4cdc0a76e8c..57dcaff250d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -54,7 +54,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newPartitionMetadataResponseCommand(error, errorMsg, requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -62,7 +62,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newPartitionMetadataResponseCommand(partitions, requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -70,7 +70,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newSuccessCommand(requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -78,7 +78,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newErrorCommand(requestId, error, message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -86,7 +86,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, schemaVersion);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -97,7 +97,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
                 schemaVersion, topicEpoch, isProducerReady);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -107,7 +107,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
                 entryId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -115,7 +115,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newSendErrorCommand(producerId, sequenceId, error, errorMsg);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -125,7 +125,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
                 filtered, changed, requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -133,7 +133,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newGetSchemaResponseCommand(requestId, schema, version);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -141,7 +141,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newGetSchemaResponseErrorCommand(requestId, error, errorMessage);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -149,7 +149,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newGetOrCreateSchemaResponseCommand(requestId, schemaVersion);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -158,7 +158,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
                 Commands.newGetOrCreateSchemaResponseErrorCommand(requestId, error, errorMessage);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -166,7 +166,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newConnectedCommand(clientProtocolVersion, maxMessageSize);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -177,7 +177,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
                 authoritative, response, requestId, proxyThroughServiceUrl);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -185,7 +185,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newLookupErrorResponseCommand(error, errorMsg, requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -201,12 +201,12 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
 
     @Override
     public void sendSuccess(long requestId) {
-        cnx.ctx().writeAndFlush(Commands.newSuccess(requestId));
+        cnx.ctx().writeAndFlush(Commands.newSuccess(requestId), cnx.ctx().voidPromise());
     }
 
     @Override
     public void sendError(long requestId, ServerError error, String message) {
-        cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message));
+        cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message), cnx.ctx().voidPromise());
     }
 
     @Override
@@ -214,7 +214,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         // Only send notification if the client understand the command
         if (cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v9.getValue()) {
             log.info("[{}] Notifying consumer that end of topic has been reached", this);
-            cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
+            cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId), cnx.ctx().voidPromise());
         }
     }
 
@@ -295,7 +295,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newTcClientConnectResponse(requestId, error, message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -309,7 +309,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
                 txnID.getMostSigBits());
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
         if (this.interceptor != null) {
             this.interceptor.txnOpened(tcID, txnID.toString());
         }
@@ -320,7 +320,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newTxnResponse(requestId, txnID, error, message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
     }
 
     @Override
@@ -329,7 +329,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
                 txnID.getMostSigBits());
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
         if (this.interceptor != null) {
             this.interceptor.txnEnded(txnID.toString(), txnAction);
         }
@@ -340,7 +340,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
         BaseCommand command = Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), error, message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
         if (this.interceptor != null) {
             this.interceptor.txnEnded(txnID.toString(), TxnAction.ABORT_VALUE);
         }