You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/04/16 15:30:19 UTC

[pulsar] branch master updated: fix_msgMetadata_not_recycle (#6745)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dadb878  fix_msgMetadata_not_recycle (#6745)
dadb878 is described below

commit dadb878d176b8fb72a910880a316b2926d2d5466
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Thu Apr 16 23:30:07 2020 +0800

    fix_msgMetadata_not_recycle (#6745)
    
    **Motivation**
    fix when producing encrypted messages for inspection, MessageMetadata objects are not released after they are created. #6744
    ```javascript
    if (topic.isEncryptionRequired()) {
    
                headersAndPayload.markReaderIndex();
                MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
                headersAndPayload.resetReaderIndex();
                // Check whether the message is encrypted or not
                if (msgMetadata.getEncryptionKeysCount() < 1) {
                    log.warn("[{}] Messages must be encrypted", getTopic().getName());
                    cnx.ctx().channel().eventLoop().execute(() -> {
                        cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
                                "Messages must be encrypted"));
                        cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
                    });
                    return;
                }
            }
        }
    ```
    MessageMetadata was not recycled
    
    **Changes**
    Replace old value with new value
    ```javascript
     if (topic.isEncryptionRequired()) {
    
                headersAndPayload.markReaderIndex();
                MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
                headersAndPayload.resetReaderIndex();
                int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
                metadata.recycle();
                // Check whether the message is encrypted or not
                if (encryptionKeysCount < 1) {
                    log.warn("[{}] Messages must be encrypted", getTopic().getName());
                    cnx.ctx().channel().eventLoop().execute(() -> {
                        cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
                                "Messages must be encrypted"));
                        cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
                    });
                    return;
                }
            }
    ```
    
    * fix_msgMetadata_not_recycle
    
    Co-authored-by: dezhiliu <de...@tencent.com>
---
 .../src/main/java/org/apache/pulsar/broker/service/Producer.java     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index a6cd2a5..eeb12e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -180,9 +180,10 @@ public class Producer {
             headersAndPayload.markReaderIndex();
             MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
             headersAndPayload.resetReaderIndex();
-
+            int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
+            msgMetadata.recycle();
             // Check whether the message is encrypted or not
-            if (msgMetadata.getEncryptionKeysCount() < 1) {
+            if (encryptionKeysCount < 1) {
                 log.warn("[{}] Messages must be encrypted", getTopic().getName());
                 cnx.ctx().channel().eventLoop().execute(() -> {
                     cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,