You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/04 13:05:00 UTC

[pulsar] 02/02: [fix][sql]Fix presto sql avro decode error when publish non-batched msgs (#17093)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 102735f44080c56743a8d9421fab5b2db9014e8f
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Fri Aug 19 07:44:45 2022 +0800

    [fix][sql]Fix presto sql avro decode error when publish non-batched msgs (#17093)
    
    (cherry picked from commit 4b98b23d47c5f35c2fb11b3687f139a02e2cb458)
---
 .../pulsar/common/api/raw/RawMessageImpl.java      | 11 +++--
 .../pulsar/common/api/raw/RawMessageImplTest.java  | 55 +++++++++++++++++++---
 2 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 9f1d493247f..1424af0c359 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -37,6 +37,7 @@ public class RawMessageImpl implements RawMessage {
 
     private ReferenceCountedMessageMetadata msgMetadata;
     private final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+    private volatile boolean setSingleMessageMetadata;
     private ByteBuf payload;
 
     private static final Recycler<RawMessageImpl> RECYCLER = new Recycler<RawMessageImpl>() {
@@ -57,6 +58,7 @@ public class RawMessageImpl implements RawMessage {
         msgMetadata.release();
         msgMetadata = null;
         singleMessageMetadata.clear();
+        setSingleMessageMetadata = false;
 
         payload.release();
         handle.recycle(this);
@@ -72,6 +74,7 @@ public class RawMessageImpl implements RawMessage {
 
         if (singleMessageMetadata != null) {
             msg.singleMessageMetadata.copyFrom(singleMessageMetadata);
+            msg.setSingleMessageMetadata = true;
         }
         msg.messageId.ledgerId = ledgerId;
         msg.messageId.entryId = entryId;
@@ -90,7 +93,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public Map<String, String> getProperties() {
-        if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
+        if (setSingleMessageMetadata && singleMessageMetadata.getPropertiesCount() > 0) {
             return singleMessageMetadata.getPropertiesList().stream()
                       .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
                               (oldValue, newValue) -> newValue));
@@ -119,7 +122,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public long getEventTime() {
-        if (singleMessageMetadata != null && singleMessageMetadata.hasEventTime()) {
+        if (setSingleMessageMetadata && singleMessageMetadata.hasEventTime()) {
             return singleMessageMetadata.getEventTime();
         } else if (msgMetadata.getMetadata().hasEventTime()) {
             return msgMetadata.getMetadata().getEventTime();
@@ -140,7 +143,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public Optional<String> getKey() {
-        if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) {
+        if (setSingleMessageMetadata && singleMessageMetadata.hasPartitionKey()) {
             return Optional.of(singleMessageMetadata.getPartitionKey());
         } else if (msgMetadata.getMetadata().hasPartitionKey()){
             return Optional.of(msgMetadata.getMetadata().getPartitionKey());
@@ -171,7 +174,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public boolean hasBase64EncodedKey() {
-        if (singleMessageMetadata != null) {
+        if (setSingleMessageMetadata) {
             return singleMessageMetadata.isPartitionKeyB64Encoded();
         }
         return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
index 99a5f02a2e2..e7270e4546f 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
@@ -18,15 +18,20 @@
  */
 package org.apache.pulsar.common.api.raw;
 
+import static java.util.Collections.singletonList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.ByteBuf;
+import java.util.Map;
+import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-
 public class RawMessageImplTest {
 
     private static final String HARD_CODE_KEY = "__pfn_input_topic__";
@@ -38,7 +43,7 @@ public class RawMessageImplTest {
     @Test
     public void testGetProperties() {
         ReferenceCountedMessageMetadata refCntMsgMetadata =
-                ReferenceCountedMessageMetadata.get(Mockito.mock(ByteBuf.class));
+                ReferenceCountedMessageMetadata.get(mock(ByteBuf.class));
         SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
         singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_FIRST);
         singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_SECOND);
@@ -50,4 +55,42 @@ public class RawMessageImplTest {
         assertEquals(KEY_VALUE_SECOND, properties.get(HARD_CODE_KEY));
         assertEquals(HARD_CODE_KEY_ID_VALUE, properties.get(HARD_CODE_KEY_ID));
     }
+
+    @Test
+    public void testNonBatchedMessage() {
+        MessageMetadata messageMetadata = new MessageMetadata();
+        messageMetadata.setPartitionKeyB64Encoded(true);
+        messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1")));
+        messageMetadata.setEventTime(100L);
+
+        ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class);
+        when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+        // Non-batched message's singleMessageMetadata is null
+        RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, null, null, 0, 0, 0);
+        assertTrue(msg.hasBase64EncodedKey());
+        assertEquals(msg.getProperties(), ImmutableMap.of("key1", "value1"));
+        assertEquals(msg.getEventTime(), 100L);
+    }
+
+    @Test
+    public void testBatchedMessage() {
+        MessageMetadata messageMetadata = new MessageMetadata();
+        messageMetadata.setPartitionKeyB64Encoded(true);
+        messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1")));
+        messageMetadata.setEventTime(100L);
+
+        ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class);
+        when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        singleMessageMetadata.setPartitionKeyB64Encoded(false);
+        singleMessageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key2").setValue("value2")));
+        singleMessageMetadata.setEventTime(200L);
+
+        RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, singleMessageMetadata, null, 0, 0, 0);
+        assertFalse(msg.hasBase64EncodedKey());
+        assertEquals(msg.getProperties(), ImmutableMap.of("key2", "value2"));
+        assertEquals(msg.getEventTime(), 200L);
+    }
 }
\ No newline at end of file