You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/04 13:05:00 UTC
[pulsar] 02/02: [fix][sql]Fix presto sql avro decode error when publish non-batched msgs (#17093)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 102735f44080c56743a8d9421fab5b2db9014e8f
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Fri Aug 19 07:44:45 2022 +0800
[fix][sql]Fix presto sql avro decode error when publish non-batched msgs (#17093)
(cherry picked from commit 4b98b23d47c5f35c2fb11b3687f139a02e2cb458)
---
.../pulsar/common/api/raw/RawMessageImpl.java | 11 +++--
.../pulsar/common/api/raw/RawMessageImplTest.java | 55 +++++++++++++++++++---
2 files changed, 56 insertions(+), 10 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 9f1d493247f..1424af0c359 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -37,6 +37,7 @@ public class RawMessageImpl implements RawMessage {
private ReferenceCountedMessageMetadata msgMetadata;
private final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+ private volatile boolean setSingleMessageMetadata;
private ByteBuf payload;
private static final Recycler<RawMessageImpl> RECYCLER = new Recycler<RawMessageImpl>() {
@@ -57,6 +58,7 @@ public class RawMessageImpl implements RawMessage {
msgMetadata.release();
msgMetadata = null;
singleMessageMetadata.clear();
+ setSingleMessageMetadata = false;
payload.release();
handle.recycle(this);
@@ -72,6 +74,7 @@ public class RawMessageImpl implements RawMessage {
if (singleMessageMetadata != null) {
msg.singleMessageMetadata.copyFrom(singleMessageMetadata);
+ msg.setSingleMessageMetadata = true;
}
msg.messageId.ledgerId = ledgerId;
msg.messageId.entryId = entryId;
@@ -90,7 +93,7 @@ public class RawMessageImpl implements RawMessage {
@Override
public Map<String, String> getProperties() {
- if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
+ if (setSingleMessageMetadata && singleMessageMetadata.getPropertiesCount() > 0) {
return singleMessageMetadata.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
(oldValue, newValue) -> newValue));
@@ -119,7 +122,7 @@ public class RawMessageImpl implements RawMessage {
@Override
public long getEventTime() {
- if (singleMessageMetadata != null && singleMessageMetadata.hasEventTime()) {
+ if (setSingleMessageMetadata && singleMessageMetadata.hasEventTime()) {
return singleMessageMetadata.getEventTime();
} else if (msgMetadata.getMetadata().hasEventTime()) {
return msgMetadata.getMetadata().getEventTime();
@@ -140,7 +143,7 @@ public class RawMessageImpl implements RawMessage {
@Override
public Optional<String> getKey() {
- if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) {
+ if (setSingleMessageMetadata && singleMessageMetadata.hasPartitionKey()) {
return Optional.of(singleMessageMetadata.getPartitionKey());
} else if (msgMetadata.getMetadata().hasPartitionKey()){
return Optional.of(msgMetadata.getMetadata().getPartitionKey());
@@ -171,7 +174,7 @@ public class RawMessageImpl implements RawMessage {
@Override
public boolean hasBase64EncodedKey() {
- if (singleMessageMetadata != null) {
+ if (setSingleMessageMetadata) {
return singleMessageMetadata.isPartitionKeyB64Encoded();
}
return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
index 99a5f02a2e2..e7270e4546f 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
@@ -18,15 +18,20 @@
*/
package org.apache.pulsar.common.api.raw;
+import static java.util.Collections.singletonList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
+import java.util.Map;
+import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
-import org.mockito.Mockito;
import org.testng.annotations.Test;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-
public class RawMessageImplTest {
private static final String HARD_CODE_KEY = "__pfn_input_topic__";
@@ -38,7 +43,7 @@ public class RawMessageImplTest {
@Test
public void testGetProperties() {
ReferenceCountedMessageMetadata refCntMsgMetadata =
- ReferenceCountedMessageMetadata.get(Mockito.mock(ByteBuf.class));
+ ReferenceCountedMessageMetadata.get(mock(ByteBuf.class));
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_FIRST);
singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_SECOND);
@@ -50,4 +55,42 @@ public class RawMessageImplTest {
assertEquals(KEY_VALUE_SECOND, properties.get(HARD_CODE_KEY));
assertEquals(HARD_CODE_KEY_ID_VALUE, properties.get(HARD_CODE_KEY_ID));
}
+
+ @Test
+ public void testNonBatchedMessage() {
+ MessageMetadata messageMetadata = new MessageMetadata();
+ messageMetadata.setPartitionKeyB64Encoded(true);
+ messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1")));
+ messageMetadata.setEventTime(100L);
+
+ ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class);
+ when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+ // Non-batched message's singleMessageMetadata is null
+ RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, null, null, 0, 0, 0);
+ assertTrue(msg.hasBase64EncodedKey());
+ assertEquals(msg.getProperties(), ImmutableMap.of("key1", "value1"));
+ assertEquals(msg.getEventTime(), 100L);
+ }
+
+ @Test
+ public void testBatchedMessage() {
+ MessageMetadata messageMetadata = new MessageMetadata();
+ messageMetadata.setPartitionKeyB64Encoded(true);
+ messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1")));
+ messageMetadata.setEventTime(100L);
+
+ ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class);
+ when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+ SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+ singleMessageMetadata.setPartitionKeyB64Encoded(false);
+ singleMessageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key2").setValue("value2")));
+ singleMessageMetadata.setEventTime(200L);
+
+ RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, singleMessageMetadata, null, 0, 0, 0);
+ assertFalse(msg.hasBase64EncodedKey());
+ assertEquals(msg.getProperties(), ImmutableMap.of("key2", "value2"));
+ assertEquals(msg.getEventTime(), 200L);
+ }
}
\ No newline at end of file