You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/04/16 15:30:19 UTC
[pulsar] branch master updated: fix_msgMetadata_not_recycle (#6745)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dadb878 fix_msgMetadata_not_recycle (#6745)
dadb878 is described below
commit dadb878d176b8fb72a910880a316b2926d2d5466
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Thu Apr 16 23:30:07 2020 +0800
fix_msgMetadata_not_recycle (#6745)
**Motivation**
fix when producing encrypted messages for inspection, MessageMetadata objects are not released after they are created. #6744
```javascript
if (topic.isEncryptionRequired()) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
// Check whether the message is encrypted or not
if (msgMetadata.getEncryptionKeysCount() < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted"));
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}
}
}
```
MessageMetadata was not recycled
**Changes**
Replace old value with new value
```javascript
if (topic.isEncryptionRequired()) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
metadata.recycle();
// Check whether the message is encrypted or not
if (encryptionKeysCount < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted"));
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}
}
```
* fix_msgMetadata_not_recycle
Co-authored-by: dezhiliu <de...@tencent.com>
---
.../src/main/java/org/apache/pulsar/broker/service/Producer.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index a6cd2a5..eeb12e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -180,9 +180,10 @@ public class Producer {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
-
+ int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
+ msgMetadata.recycle();
// Check whether the message is encrypted or not
- if (msgMetadata.getEncryptionKeysCount() < 1) {
+ if (encryptionKeysCount < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,