You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:56 UTC
[pulsar] 10/38: fix_msgMetadata_not_recycle (#6745)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8c2d1cf5251a16404437ed779ea09ca3062e7cb0
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Thu Apr 16 23:30:07 2020 +0800
fix_msgMetadata_not_recycle (#6745)
**Motivation**
fix when producing encrypted messages for inspection, MessageMetadata objects are not released after they are created. #6744
```javascript
if (topic.isEncryptionRequired()) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
// Check whether the message is encrypted or not
if (msgMetadata.getEncryptionKeysCount() < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted"));
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}
}
}
```
MessageMetadata was not recycled
**Changes**
Replace old value with new value
```javascript
if (topic.isEncryptionRequired()) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
metadata.recycle();
// Check whether the message is encrypted or not
if (encryptionKeysCount < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted"));
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}
}
```
* fix_msgMetadata_not_recycle
Co-authored-by: dezhiliu <de...@tencent.com>(cherry picked from commit dadb878d176b8fb72a910880a316b2926d2d5466)
---
.../src/main/java/org/apache/pulsar/broker/service/Producer.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index a7c97da..c6cbada 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -180,9 +180,10 @@ public class Producer {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
-
+ int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
+ msgMetadata.recycle();
// Check whether the message is encrypted or not
- if (msgMetadata.getEncryptionKeysCount() < 1) {
+ if (encryptionKeysCount < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,