You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/20 17:50:00 UTC
[pulsar] branch master updated: Avoid some promise object allocations when writing on channel (#16113)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e604348c94a Avoid some promise object allocations when writing on channel (#16113)
e604348c94a is described below
commit e604348c94afe240f5f82ccace24346d8bfe0bdc
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 20 10:49:49 2022 -0700
Avoid some promise object allocations when writing on channel (#16113)
---
.../broker/service/PulsarCommandSenderImpl.java | 48 +++++++++++-----------
1 file changed, 24 insertions(+), 24 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 4cdc0a76e8c..57dcaff250d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -54,7 +54,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newPartitionMetadataResponseCommand(error, errorMsg, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -62,7 +62,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newPartitionMetadataResponseCommand(partitions, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -70,7 +70,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newSuccessCommand(requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -78,7 +78,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newErrorCommand(requestId, error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -86,7 +86,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, schemaVersion);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -97,7 +97,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
schemaVersion, topicEpoch, isProducerReady);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -107,7 +107,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
entryId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -115,7 +115,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newSendErrorCommand(producerId, sequenceId, error, errorMsg);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -125,7 +125,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
filtered, changed, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -133,7 +133,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newGetSchemaResponseCommand(requestId, schema, version);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -141,7 +141,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newGetSchemaResponseErrorCommand(requestId, error, errorMessage);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -149,7 +149,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newGetOrCreateSchemaResponseCommand(requestId, schemaVersion);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -158,7 +158,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
Commands.newGetOrCreateSchemaResponseErrorCommand(requestId, error, errorMessage);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -166,7 +166,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newConnectedCommand(clientProtocolVersion, maxMessageSize);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -177,7 +177,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
authoritative, response, requestId, proxyThroughServiceUrl);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -185,7 +185,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newLookupErrorResponseCommand(error, errorMsg, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -201,12 +201,12 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
@Override
public void sendSuccess(long requestId) {
- cnx.ctx().writeAndFlush(Commands.newSuccess(requestId));
+ cnx.ctx().writeAndFlush(Commands.newSuccess(requestId), cnx.ctx().voidPromise());
}
@Override
public void sendError(long requestId, ServerError error, String message) {
- cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message));
+ cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message), cnx.ctx().voidPromise());
}
@Override
@@ -214,7 +214,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
// Only send notification if the client understand the command
if (cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v9.getValue()) {
log.info("[{}] Notifying consumer that end of topic has been reached", this);
- cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
+ cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId), cnx.ctx().voidPromise());
}
}
@@ -295,7 +295,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newTcClientConnectResponse(requestId, error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -309,7 +309,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
txnID.getMostSigBits());
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
if (this.interceptor != null) {
this.interceptor.txnOpened(tcID, txnID.toString());
}
@@ -320,7 +320,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newTxnResponse(requestId, txnID, error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
}
@Override
@@ -329,7 +329,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
txnID.getMostSigBits());
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
if (this.interceptor != null) {
this.interceptor.txnEnded(txnID.toString(), txnAction);
}
@@ -340,7 +340,7 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
BaseCommand command = Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
if (this.interceptor != null) {
this.interceptor.txnEnded(txnID.toString(), TxnAction.ABORT_VALUE);
}