You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 07:09:09 UTC

[pulsar] branch branch-2.9 updated (134dd2c0c94 -> d5255e21ced)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 134dd2c0c94 [fix][broker] Update the log print content of createSubscriptions (#18024)
     new 42d0060ccbe [fix][broker]Cache invalidation due to concurrent access (#18076)
     new d5255e21ced [fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic (#18405)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/bookkeeper/mledger/util/RangeCache.java    |  7 +++----
 .../java/org/apache/pulsar/client/impl/MessageIdImpl.java | 10 ++++++++--
 .../pulsar/client/impl}/MessageIdSerializationTest.java   | 15 ++++++++++++---
 3 files changed, 23 insertions(+), 9 deletions(-)
 rename {pulsar-broker/src/test/java/org/apache/pulsar/broker/service => pulsar-client/src/test/java/org/apache/pulsar/client/impl}/MessageIdSerializationTest.java (75%)


[pulsar] 02/02: [fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic (#18405)

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d5255e21ced0828a84d40f1f96a52a0883e80274
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Nov 14 17:27:10 2022 +0800

    [fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic (#18405)
    
    Fixes https://github.com/apache/pulsar/issues/18395
    
    ### Motivation
    
    The old version Pulsar clients might not set the `batch_size` field in a
    batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
    which only checks the `batch_index` field, fail with
    IllegalStateException.
    
    ### Modifications
    
    Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
    it doesn't exist, create the `BatchMessageIdImpl` instance with the
    default batch size (0) and the acker (disabled).
    
    Move `MessageIdSerializationTest` to the `pulsar-client` module and add
    the `testBatchSizeNotSet` to verify the change works.
    
    (cherry picked from commit 8246e3bdd2173541b15dc1a26738bf59639949eb)
---
 .../java/org/apache/pulsar/client/impl/MessageIdImpl.java | 10 ++++++++--
 .../pulsar/client/impl}/MessageIdSerializationTest.java   | 15 ++++++++++++---
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 2d571852919..ac37d1d5eb2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -154,8 +154,14 @@ public class MessageIdImpl implements MessageId {
 
         MessageId messageId;
         if (idData.hasBatchIndex()) {
-            messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
-                idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
+            if (idData.hasBatchSize()) {
+                messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
+                        idData.getBatchIndex(), idData.getBatchSize(),
+                        BatchMessageAcker.newAcker(idData.getBatchSize()));
+            } else {
+                messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
+                        idData.getBatchIndex(), 0, BatchMessageAckerDisabled.INSTANCE);
+            }
         } else {
             messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
similarity index 75%
rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java
rename to pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
index 295c7803372..b1cfad15128 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageIdSerializationTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java
@@ -16,15 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service;
+package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
 import java.io.IOException;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.testng.annotations.Test;
 
-@Test(groups = "broker")
 public class MessageIdSerializationTest {
 
     @Test
@@ -32,6 +30,7 @@ public class MessageIdSerializationTest {
         MessageId id = new MessageIdImpl(1, 2, 3);
         byte[] serializedId = id.toByteArray();
         assertEquals(MessageId.fromByteArray(serializedId), id);
+        assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id);
     }
 
     @Test
@@ -39,6 +38,16 @@ public class MessageIdSerializationTest {
         MessageId id = new MessageIdImpl(1, 2, -1);
         byte[] serializedId = id.toByteArray();
         assertEquals(MessageId.fromByteArray(serializedId), id);
+        assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id);
+    }
+
+    @Test
+    public void testBatchSizeNotSet() throws Exception {
+        MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1,
+                BatchMessageAckerDisabled.INSTANCE);
+        byte[] serialized = id.toByteArray();
+        assertEquals(MessageId.fromByteArray(serialized), id);
+        assertEquals(MessageId.fromByteArrayWithTopic(serialized, "my-topic"), id);
     }
 
     @Test(expectedExceptions = NullPointerException.class)


[pulsar] 01/02: [fix][broker]Cache invalidation due to concurrent access (#18076)

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 42d0060ccbedeca1bd383be18af94cd91df523e6
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Oct 19 16:51:27 2022 +0800

    [fix][broker]Cache invalidation due to concurrent access (#18076)
    
    (cherry picked from commit 7e5cad778907f3a0095343492cbf07ca202976b4)
---
 .../main/java/org/apache/bookkeeper/mledger/util/RangeCache.java   | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index 4a77ac91dca..6d6e71b18fa 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -192,13 +192,12 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
            if (entry == null || timestampExtractor.getTimestamp(entry.getValue()) > maxTimestamp) {
                break;
            }
-
-           entry = entries.pollFirstEntry();
-           if (entry == null) {
+           Value value = entry.getValue();
+           boolean removeHits = entries.remove(entry.getKey(), value);
+           if (!removeHits) {
                break;
            }
 
-           Value value = entry.getValue();
            removedSize += weighter.getSize(value);
            value.release();
        }