You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:56 UTC

[pulsar] 10/38: fix_msgMetadata_not_recycle (#6745)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8c2d1cf5251a16404437ed779ea09ca3062e7cb0
Author: liudezhi <33...@users.noreply.github.com>
AuthorDate: Thu Apr 16 23:30:07 2020 +0800

    fix_msgMetadata_not_recycle (#6745)
    
    **Motivation**
    fix when producing encrypted messages for inspection, MessageMetadata objects are not released after they are created. #6744
    ```javascript
    if (topic.isEncryptionRequired()) {
    
                headersAndPayload.markReaderIndex();
                MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
                headersAndPayload.resetReaderIndex();
                // Check whether the message is encrypted or not
                if (msgMetadata.getEncryptionKeysCount() < 1) {
                    log.warn("[{}] Messages must be encrypted", getTopic().getName());
                    cnx.ctx().channel().eventLoop().execute(() -> {
                        cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
                                "Messages must be encrypted"));
                        cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
                    });
                    return;
                }
            }
        }
    ```
    MessageMetadata was not recycled
    
    **Changes**
    Replace old value with new value
    ```javascript
     if (topic.isEncryptionRequired()) {
    
                headersAndPayload.markReaderIndex();
                MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
                headersAndPayload.resetReaderIndex();
                int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
                metadata.recycle();
                // Check whether the message is encrypted or not
                if (encryptionKeysCount < 1) {
                    log.warn("[{}] Messages must be encrypted", getTopic().getName());
                    cnx.ctx().channel().eventLoop().execute(() -> {
                        cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
                                "Messages must be encrypted"));
                        cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
                    });
                    return;
                }
            }
    ```
    
    * fix_msgMetadata_not_recycle
    
    Co-authored-by: dezhiliu <de...@tencent.com>(cherry picked from commit dadb878d176b8fb72a910880a316b2926d2d5466)
---
 .../src/main/java/org/apache/pulsar/broker/service/Producer.java     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index a7c97da..c6cbada 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -180,9 +180,10 @@ public class Producer {
             headersAndPayload.markReaderIndex();
             MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
             headersAndPayload.resetReaderIndex();
-
+            int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
+            msgMetadata.recycle();
             // Check whether the message is encrypted or not
-            if (msgMetadata.getEncryptionKeysCount() < 1) {
+            if (encryptionKeysCount < 1) {
                 log.warn("[{}] Messages must be encrypted", getTopic().getName());
                 cnx.ctx().channel().eventLoop().execute(() -> {
                     cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,