You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/04 13:04:58 UTC

[pulsar] branch branch-2.10 updated (4fda75b68b0 -> 102735f4408)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 4fda75b68b0 [fix][broker] Fix calculate avg message per entry (#17046)
     new a3b8627199e [fix][security] Bump PostgreSQL version to 42.4.1(#17066)
     new 102735f4408 [fix][sql]Fix presto sql avro decode error when publish non-batched msgs (#17093)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |  4 +-
 .../pulsar/common/api/raw/RawMessageImpl.java      | 11 +++--
 .../pulsar/common/api/raw/RawMessageImplTest.java  | 55 +++++++++++++++++++---
 3 files changed, 58 insertions(+), 12 deletions(-)


[pulsar] 01/02: [fix][security] Bump PostgreSQL version to 42.4.1(#17066)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a3b8627199ebe79ed8db7bb7a47455adcce91340
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Sat Aug 13 10:14:51 2022 +0800

    [fix][security] Bump PostgreSQL version to 42.4.1(#17066)
    
    (cherry picked from commit 16adb61c9c49f48fa5b045afc785dedb2c2e106d)
---
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index a607cc67e8f..033470a5a9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,7 +152,7 @@ flexible messaging model and an intuitive client API.</description>
     <guice.version>5.1.0</guice.version>
     <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
     <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
-    <postgresql-jdbc.version>42.3.3</postgresql-jdbc.version>
+    <postgresql-jdbc.version>42.4.1</postgresql-jdbc.version>
     <clickhouse-jdbc.version>0.3.2</clickhouse-jdbc.version>
     <mariadb-jdbc.version>2.7.5</mariadb-jdbc.version>
     <hdfs-offload-version3>3.3.3</hdfs-offload-version3>
@@ -162,7 +162,7 @@ flexible messaging model and an intuitive client API.</description>
     <scala.binary.version>2.13</scala.binary.version>
     <scala-library.version>2.13.6</scala-library.version>
     <debezium.version>1.7.2.Final</debezium.version>
-    <debezium.postgresql.version>42.3.3</debezium.postgresql.version>
+    <debezium.postgresql.version>42.4.1</debezium.postgresql.version>
     <debezium.mysql.version>8.0.28</debezium.mysql.version>
     <jsonwebtoken.version>0.11.1</jsonwebtoken.version>
     <opencensus.version>0.28.0</opencensus.version>


[pulsar] 02/02: [fix][sql]Fix presto sql avro decode error when publish non-batched msgs (#17093)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 102735f44080c56743a8d9421fab5b2db9014e8f
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Fri Aug 19 07:44:45 2022 +0800

    [fix][sql]Fix presto sql avro decode error when publish non-batched msgs (#17093)
    
    (cherry picked from commit 4b98b23d47c5f35c2fb11b3687f139a02e2cb458)
---
 .../pulsar/common/api/raw/RawMessageImpl.java      | 11 +++--
 .../pulsar/common/api/raw/RawMessageImplTest.java  | 55 +++++++++++++++++++---
 2 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index 9f1d493247f..1424af0c359 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -37,6 +37,7 @@ public class RawMessageImpl implements RawMessage {
 
     private ReferenceCountedMessageMetadata msgMetadata;
     private final SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+    private volatile boolean setSingleMessageMetadata;
     private ByteBuf payload;
 
     private static final Recycler<RawMessageImpl> RECYCLER = new Recycler<RawMessageImpl>() {
@@ -57,6 +58,7 @@ public class RawMessageImpl implements RawMessage {
         msgMetadata.release();
         msgMetadata = null;
         singleMessageMetadata.clear();
+        setSingleMessageMetadata = false;
 
         payload.release();
         handle.recycle(this);
@@ -72,6 +74,7 @@ public class RawMessageImpl implements RawMessage {
 
         if (singleMessageMetadata != null) {
             msg.singleMessageMetadata.copyFrom(singleMessageMetadata);
+            msg.setSingleMessageMetadata = true;
         }
         msg.messageId.ledgerId = ledgerId;
         msg.messageId.entryId = entryId;
@@ -90,7 +93,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public Map<String, String> getProperties() {
-        if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) {
+        if (setSingleMessageMetadata && singleMessageMetadata.getPropertiesCount() > 0) {
             return singleMessageMetadata.getPropertiesList().stream()
                       .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
                               (oldValue, newValue) -> newValue));
@@ -119,7 +122,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public long getEventTime() {
-        if (singleMessageMetadata != null && singleMessageMetadata.hasEventTime()) {
+        if (setSingleMessageMetadata && singleMessageMetadata.hasEventTime()) {
             return singleMessageMetadata.getEventTime();
         } else if (msgMetadata.getMetadata().hasEventTime()) {
             return msgMetadata.getMetadata().getEventTime();
@@ -140,7 +143,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public Optional<String> getKey() {
-        if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) {
+        if (setSingleMessageMetadata && singleMessageMetadata.hasPartitionKey()) {
             return Optional.of(singleMessageMetadata.getPartitionKey());
         } else if (msgMetadata.getMetadata().hasPartitionKey()){
             return Optional.of(msgMetadata.getMetadata().getPartitionKey());
@@ -171,7 +174,7 @@ public class RawMessageImpl implements RawMessage {
 
     @Override
     public boolean hasBase64EncodedKey() {
-        if (singleMessageMetadata != null) {
+        if (setSingleMessageMetadata) {
             return singleMessageMetadata.isPartitionKeyB64Encoded();
         }
         return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
index 99a5f02a2e2..e7270e4546f 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/raw/RawMessageImplTest.java
@@ -18,15 +18,20 @@
  */
 package org.apache.pulsar.common.api.raw;
 
+import static java.util.Collections.singletonList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.ByteBuf;
+import java.util.Map;
+import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-
 public class RawMessageImplTest {
 
     private static final String HARD_CODE_KEY = "__pfn_input_topic__";
@@ -38,7 +43,7 @@ public class RawMessageImplTest {
     @Test
     public void testGetProperties() {
         ReferenceCountedMessageMetadata refCntMsgMetadata =
-                ReferenceCountedMessageMetadata.get(Mockito.mock(ByteBuf.class));
+                ReferenceCountedMessageMetadata.get(mock(ByteBuf.class));
         SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
         singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_FIRST);
         singleMessageMetadata.addProperty().setKey(HARD_CODE_KEY).setValue(KEY_VALUE_SECOND);
@@ -50,4 +55,42 @@ public class RawMessageImplTest {
         assertEquals(KEY_VALUE_SECOND, properties.get(HARD_CODE_KEY));
         assertEquals(HARD_CODE_KEY_ID_VALUE, properties.get(HARD_CODE_KEY_ID));
     }
+
+    @Test
+    public void testNonBatchedMessage() {
+        MessageMetadata messageMetadata = new MessageMetadata();
+        messageMetadata.setPartitionKeyB64Encoded(true);
+        messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1")));
+        messageMetadata.setEventTime(100L);
+
+        ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class);
+        when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+        // Non-batched message's singleMessageMetadata is null
+        RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, null, null, 0, 0, 0);
+        assertTrue(msg.hasBase64EncodedKey());
+        assertEquals(msg.getProperties(), ImmutableMap.of("key1", "value1"));
+        assertEquals(msg.getEventTime(), 100L);
+    }
+
+    @Test
+    public void testBatchedMessage() {
+        MessageMetadata messageMetadata = new MessageMetadata();
+        messageMetadata.setPartitionKeyB64Encoded(true);
+        messageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key1").setValue("value1")));
+        messageMetadata.setEventTime(100L);
+
+        ReferenceCountedMessageMetadata refCntMsgMetadata = mock(ReferenceCountedMessageMetadata.class);
+        when(refCntMsgMetadata.getMetadata()).thenReturn(messageMetadata);
+
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        singleMessageMetadata.setPartitionKeyB64Encoded(false);
+        singleMessageMetadata.addAllProperties(singletonList(new KeyValue().setKey("key2").setValue("value2")));
+        singleMessageMetadata.setEventTime(200L);
+
+        RawMessage msg = RawMessageImpl.get(refCntMsgMetadata, singleMessageMetadata, null, 0, 0, 0);
+        assertFalse(msg.hasBase64EncodedKey());
+        assertEquals(msg.getProperties(), ImmutableMap.of("key2", "value2"));
+        assertEquals(msg.getEventTime(), 200L);
+    }
 }
\ No newline at end of file