You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/16 07:56:52 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

poorbarcode opened a new pull request, #18877:
URL: https://github.com/apache/pulsar/pull/18877

   ### Motivation
   
   The method `consumer.getLastMessageId` will return the latest message which can be received.
   - If disabled `read compacted`, will return the last confirmed position of `ManagedLedger`.
   - If enabled `read compacted`, will return the latest message id which can be read from the compacted topic.
   
   If we send a batch message like this:
   
   ```java
   producer.newMessage().key("k1").value("v0").sendAsync(); // message-id is [3:1,-1:0]
   producer.newMessage().key("k1").value("v1").sendAsync(); // message-id is [3:1,-1:1]
   producer.newMessage().key("k1").value("v2").sendAsync(); // message-id is [3:1,-1:2]
   producer.newMessage().key("k2").value("v0").sendAsync(); // message-id is [3:1,-1:3]
   producer.newMessage().key("k2").value("v1").sendAsync(); // message-id is [3:1,-1:4]
   producer.newMessage().key("k2").value(null).sendAsync(); // message-id is [3:1,-1:5]
   producer.flush();
   ```
   
   After the compaction task is done, the messages with key `k2` will be deleted by the compaction task. Then the latest message that can be received will be `[3:1:-1:2]`.
   
   --- 
   When we call `consumer.getLastMessageId`, the expected result is:
   
   ```
   [3:1,-1:2]
   ```
   
   --- 
   But the actual result is:
   
   ```
   [3:1,-1:5]
   ```
   
   ### Modifications
   If enabled `read compacted` and the latest entry of the compacted topic is a batched message, extract the entry and calculate all internal messages, then return the latest message which is not marked `compacted out`.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   - https://github.com/poorbarcode/pulsar/pull/49
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1045277664


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        // Disable the scheduled task: compaction.
+        conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE);
+        // Disable the scheduled task: retention.
+        conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE);
+    }
+
+    private MessageIdImpl getLastMessageIdByTopic(String topicName) throws Exception{
+        return (MessageIdImpl) pulsar.getBrokerService().getTopic(topicName, false)
+                .get().get().getLastMessageId().get();
+    }
+
+    private void triggerCompactionAndWait(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        persistentTopic.triggerCompaction();
+        Awaitility.await().untilAsserted(() -> {
+            PositionImpl lastConfirmPos = (PositionImpl) persistentTopic.getManagedLedger().getLastConfirmedEntry();
+            PositionImpl markDeletePos = (PositionImpl) persistentTopic
+                    .getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition();
+            assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId());
+            assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId());
+        });
+    }
+
+    private void triggerLedgerSwitch(String topicName) throws Exception{
+        admin.topics().unload(topicName);
+        Awaitility.await().until(() -> {
+            CompletableFuture<Optional<Topic>> topicFuture =
+                    pulsar.getBrokerService().getTopic(topicName, false);
+            if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){
+                return false;
+            }
+            Optional<Topic> topicOptional = topicFuture.join();
+            if (!topicOptional.isPresent()){
+                return false;
+            }
+            PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+            return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened;
+        });
+    }
+
+    private void clearAllTheLedgersOutdated(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            CompletableFuture future = new CompletableFuture();
+            managedLedger.trimConsumedLedgersInBackground(future);
+            future.join();
+            return managedLedger.getLedgersInfo().size() == 1;
+        });
+    }
+
+    @Test
+    public void testGetLastMessageIdWhenLedgerEmpty() throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .readCompacted(true)
+                .subscribe();
+        MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId();
+        assertEquals(messageId.getLedgerId(), -1);
+        assertEquals(messageId.getEntryId(), -1);
+
+        // cleanup.
+        consumer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test
+    public void testGetLastMessageIdWhenNoNonEmptyLedgerExists() throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        ReaderImpl<String> reader = (ReaderImpl<String>) pulsarClient.newReader(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(false)
+                .create();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        producer.newMessage().key("k0").value("v0").sendAsync().get();
+        reader.readNext();
+        triggerLedgerSwitch(topicName);
+        clearAllTheLedgersOutdated(topicName);
+
+        MessageIdImpl messageId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
+        assertEquals(messageId.getLedgerId(), -1);
+        assertEquals(messageId.getEntryId(), -1);
+
+        // cleanup.
+        reader.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @DataProvider(name = "enabledBatch")
+    public Object[][] enabledBatch(){
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "enabledBatch")
+    public void testGetLastMessageIdBeforeCompaction(boolean enabledBatch) throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .readCompacted(true)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(enabledBatch)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+        producer.flush();
+        sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+        producer.flush();
+        FutureUtil.waitForAll(sendFutures).join();
+
+        MessageIdImpl lastMessageIdExpected = getLastMessageIdByTopic(topicName);
+        MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
+        assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId());
+        assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId());
+        if (enabledBatch){
+            BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdExpected;
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
+            assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize());
+            assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex());
+        }
+
+        // cleanup.
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test(dataProvider = "enabledBatch")
+    public void testGetLastMessageIdAfterCompaction(boolean enabledBatch) throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .readCompacted(true)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(enabledBatch)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+        producer.flush();
+        sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+        producer.flush();
+        FutureUtil.waitForAll(sendFutures).join();

Review Comment:
   There is no need to use a list of futures to wait a series of `sendAsync` calls. Just wait the last future like
   
   ```java
   producer.sendAsync("msg-0");
   producer.sendAsync("msg-1");
   ...
   var lastSendFuture = producer.sendAsync("msg-N");
   producer.flush();
   lastSendFuture.get();
   ```
   
   In addition, to avoid messages being distributed in multiple batches, it's better to configure a large batch timeout. Then use `producer.flush()` to flush the batched messages. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1165000498


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);

Review Comment:
    @codelipenghui 
   
   I think this can error if the payload is compressed, encrypted, or both.
   
   Also, it would be insecure when decrypting customer data here if encrypted.
   
   Can we fix this issue by looking at the metadata only?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#issuecomment-1356223408

   Hi @gaoran10 
   
   > Maybe we can consider adding a property in message metadata to indicate the last valid message index, I'm afraid checking single metadata will cost too many resources if there are many messages in a batch.
   
   Good suggestion.
   
   I will fix it in another PR. To avoid modifying the definition of message metadata, I chose a different solution to improve the performance of extracting batched messages. see:
   - https://github.com/poorbarcode/pulsar/pull/50
   
   After the current PR is merged, I will push the PR above to the apache repository.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1045352916


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+            singleMessagePayload.release();
+            if (singleMessageMetadata.isCompactedOut()){
+                continue;
+            }
+            lastBatchIndexInBatch = i;
+        }
+        return lastBatchIndexInBatch;

Review Comment:
   Oh I see. It's limited by the implementation of `Commands#deSerializeSingleMessageInBatch`. Since the GetLastMessageId should not be a hot path, it's okay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1045275815


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+            singleMessagePayload.release();
+            if (singleMessageMetadata.isCompactedOut()){
+                continue;
+            }
+            lastBatchIndexInBatch = i;
+        }
+        return lastBatchIndexInBatch;

Review Comment:
   ```suggestion
           for (int i = batchSize - 1; i >= 0; i--) {
               ByteBuf singleMessagePayload =
                       Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
               singleMessagePayload.release();
               if (!singleMessageMetadata.isCompactedOut()) {
                   return i;
               }
           }
           return -1;
   ```
   
   Use a reversed iteration to avoid unnecessary iterations because the most of time the expected batch index is `num_messages - 1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 merged pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
congbobo184 merged PR #18877:
URL: https://github.com/apache/pulsar/pull/18877


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1165000498


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);

Review Comment:
   @poorbarcode 
   
   I think this can error if the payload is compressed, encrypted, or both.
   
   Also, it would be insecure when decrypting customer data here if encrypted.
   
   Can we fix this issue by looking at the metadata only?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode closed pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode closed pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted
URL: https://github.com/apache/pulsar/pull/18877


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1052795762


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+            singleMessagePayload.release();
+            if (singleMessageMetadata.isCompactedOut()){
+                continue;

Review Comment:
   Hi Bo
   The constructor of batched message entry is `[single_msg_size_1, single_msg_content_1, single_msg_size_2, single_msg_content_2......]`, we can not extract the last internal message directly. same as https://github.com/apache/pulsar/pull/18877/files/b428afd7916b94542a20acf8f1f029bddcc0da81#r1050335022



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1050335022


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);

Review Comment:
   Perhaps reverse order is better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1052795762


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+            singleMessagePayload.release();
+            if (singleMessageMetadata.isCompactedOut()){
+                continue;

Review Comment:
   Hi Bo
   The constructor of batched message entry is `[single_msg_size_1, single_msg_content_1, single_msg_size_2, single_msg_content_2......]`, we can not extract the last internal message directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1045340232


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+            singleMessagePayload.release();
+            if (singleMessageMetadata.isCompactedOut()){
+                continue;
+            }
+            lastBatchIndexInBatch = i;
+        }
+        return lastBatchIndexInBatch;

Review Comment:
   This is a good suggestion, but batch messages can only be parsed from front to back (^_^)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1045277793


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        // Disable the scheduled task: compaction.
+        conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE);
+        // Disable the scheduled task: retention.
+        conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE);
+    }
+
+    private MessageIdImpl getLastMessageIdByTopic(String topicName) throws Exception{
+        return (MessageIdImpl) pulsar.getBrokerService().getTopic(topicName, false)
+                .get().get().getLastMessageId().get();
+    }
+
+    private void triggerCompactionAndWait(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        persistentTopic.triggerCompaction();
+        Awaitility.await().untilAsserted(() -> {
+            PositionImpl lastConfirmPos = (PositionImpl) persistentTopic.getManagedLedger().getLastConfirmedEntry();
+            PositionImpl markDeletePos = (PositionImpl) persistentTopic
+                    .getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition();
+            assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId());
+            assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId());
+        });
+    }
+
+    private void triggerLedgerSwitch(String topicName) throws Exception{
+        admin.topics().unload(topicName);
+        Awaitility.await().until(() -> {
+            CompletableFuture<Optional<Topic>> topicFuture =
+                    pulsar.getBrokerService().getTopic(topicName, false);
+            if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){
+                return false;
+            }
+            Optional<Topic> topicOptional = topicFuture.join();
+            if (!topicOptional.isPresent()){
+                return false;
+            }
+            PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+            return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened;
+        });
+    }
+
+    private void clearAllTheLedgersOutdated(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            CompletableFuture future = new CompletableFuture();

Review Comment:
   ```suggestion
               CompletableFuture<Void> future = new CompletableFuture();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1045275815


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+            singleMessagePayload.release();
+            if (singleMessageMetadata.isCompactedOut()){
+                continue;
+            }
+            lastBatchIndexInBatch = i;
+        }
+        return lastBatchIndexInBatch;

Review Comment:
   ```suggestion
           for (int i = batchSize - 1; i >= 0; i--) {
               ByteBuf singleMessagePayload =
                       Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
               singleMessagePayload.release();
               if (!singleMessageMetadata.isCompactedOut()) {
                   return i;
               }
           }
           return -1;
   ```
   
   Use a reversed iteration to avoid unnecessary iterations because the most of time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1045340787


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        // Disable the scheduled task: compaction.
+        conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE);
+        // Disable the scheduled task: retention.
+        conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE);
+    }
+
+    private MessageIdImpl getLastMessageIdByTopic(String topicName) throws Exception{
+        return (MessageIdImpl) pulsar.getBrokerService().getTopic(topicName, false)
+                .get().get().getLastMessageId().get();
+    }
+
+    private void triggerCompactionAndWait(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        persistentTopic.triggerCompaction();
+        Awaitility.await().untilAsserted(() -> {
+            PositionImpl lastConfirmPos = (PositionImpl) persistentTopic.getManagedLedger().getLastConfirmedEntry();
+            PositionImpl markDeletePos = (PositionImpl) persistentTopic
+                    .getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition();
+            assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId());
+            assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId());
+        });
+    }
+
+    private void triggerLedgerSwitch(String topicName) throws Exception{
+        admin.topics().unload(topicName);
+        Awaitility.await().until(() -> {
+            CompletableFuture<Optional<Topic>> topicFuture =
+                    pulsar.getBrokerService().getTopic(topicName, false);
+            if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){
+                return false;
+            }
+            Optional<Topic> topicOptional = topicFuture.join();
+            if (!topicOptional.isPresent()){
+                return false;
+            }
+            PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+            return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened;
+        });
+    }
+
+    private void clearAllTheLedgersOutdated(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            CompletableFuture future = new CompletableFuture();
+            managedLedger.trimConsumedLedgersInBackground(future);
+            future.join();
+            return managedLedger.getLedgersInfo().size() == 1;
+        });
+    }
+
+    @Test
+    public void testGetLastMessageIdWhenLedgerEmpty() throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .readCompacted(true)
+                .subscribe();
+        MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId();
+        assertEquals(messageId.getLedgerId(), -1);
+        assertEquals(messageId.getEntryId(), -1);
+
+        // cleanup.
+        consumer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test
+    public void testGetLastMessageIdWhenNoNonEmptyLedgerExists() throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        ReaderImpl<String> reader = (ReaderImpl<String>) pulsarClient.newReader(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(false)
+                .create();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        producer.newMessage().key("k0").value("v0").sendAsync().get();
+        reader.readNext();
+        triggerLedgerSwitch(topicName);
+        clearAllTheLedgersOutdated(topicName);
+
+        MessageIdImpl messageId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
+        assertEquals(messageId.getLedgerId(), -1);
+        assertEquals(messageId.getEntryId(), -1);
+
+        // cleanup.
+        reader.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @DataProvider(name = "enabledBatch")
+    public Object[][] enabledBatch(){
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "enabledBatch")
+    public void testGetLastMessageIdBeforeCompaction(boolean enabledBatch) throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .readCompacted(true)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(enabledBatch)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+        producer.flush();
+        sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+        producer.flush();
+        FutureUtil.waitForAll(sendFutures).join();
+
+        MessageIdImpl lastMessageIdExpected = getLastMessageIdByTopic(topicName);
+        MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
+        assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId());
+        assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId());
+        if (enabledBatch){
+            BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdExpected;
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
+            assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize());
+            assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex());
+        }
+
+        // cleanup.
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test(dataProvider = "enabledBatch")
+    public void testGetLastMessageIdAfterCompaction(boolean enabledBatch) throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .readCompacted(true)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(enabledBatch)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+        producer.flush();
+        sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+        producer.flush();
+        FutureUtil.waitForAll(sendFutures).join();

Review Comment:
   Later, if we need to get the message Id returned by the specified sends, this change will not satisfy the requirement



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        // Disable the scheduled task: compaction.
+        conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE);
+        // Disable the scheduled task: retention.
+        conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE);
+    }
+
+    private MessageIdImpl getLastMessageIdByTopic(String topicName) throws Exception{
+        return (MessageIdImpl) pulsar.getBrokerService().getTopic(topicName, false)
+                .get().get().getLastMessageId().get();
+    }
+
+    private void triggerCompactionAndWait(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        persistentTopic.triggerCompaction();
+        Awaitility.await().untilAsserted(() -> {
+            PositionImpl lastConfirmPos = (PositionImpl) persistentTopic.getManagedLedger().getLastConfirmedEntry();
+            PositionImpl markDeletePos = (PositionImpl) persistentTopic
+                    .getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition();
+            assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId());
+            assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId());
+        });
+    }
+
+    private void triggerLedgerSwitch(String topicName) throws Exception{
+        admin.topics().unload(topicName);
+        Awaitility.await().until(() -> {
+            CompletableFuture<Optional<Topic>> topicFuture =
+                    pulsar.getBrokerService().getTopic(topicName, false);
+            if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){
+                return false;
+            }
+            Optional<Topic> topicOptional = topicFuture.join();
+            if (!topicOptional.isPresent()){
+                return false;
+            }
+            PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+            return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened;
+        });
+    }
+
+    private void clearAllTheLedgersOutdated(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            CompletableFuture future = new CompletableFuture();

Review Comment:
   Already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1050342116


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);

Review Comment:
   Ah, we can't extract batch messages in reverse order. Same as https://github.com/apache/pulsar/pull/18877#discussion_r1045275815



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1052323295


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+            singleMessagePayload.release();
+            if (singleMessageMetadata.isCompactedOut()){
+                continue;

Review Comment:
   Is it better to judge in reverse order? if the lastBatchIndex is not `CompactedOut` can return directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1050323779


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+            singleMessagePayload.release();
+            if (singleMessageMetadata.isCompactedOut()){
+                continue;
+            }
+            lastBatchIndexInBatch = i;
+        }
+        return lastBatchIndexInBatch;

Review Comment:
   Maybe we can consider adding a new field in message metadata to indicate the last valid message index, I'm afraid checking single metadata will cost too many resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#issuecomment-1356223595

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#issuecomment-1354363593

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/18877?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#18877](https://codecov.io/gh/apache/pulsar/pull/18877?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b428afd) into [master](https://codecov.io/gh/apache/pulsar/commit/3180a4aa04d518fa401a781d646545221c4d1fa6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3180a4a) will **decrease** coverage by `9.37%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/18877/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/18877?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #18877      +/-   ##
   ============================================
   - Coverage     46.17%   36.80%   -9.38%     
   + Complexity    10359     1956    -8403     
   ============================================
     Files           703      209     -494     
     Lines         68845    14402   -54443     
     Branches       7382     1569    -5813     
   ============================================
   - Hits          31788     5300   -26488     
   + Misses        33448     8526   -24922     
   + Partials       3609      576    -3033     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `36.80% <ø> (-9.38%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/18877?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...he/pulsar/client/impl/PartitionedProducerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1BhcnRpdGlvbmVkUHJvZHVjZXJJbXBsLmphdmE=) | `30.34% <0.00%> (-5.13%)` | :arrow_down: |
   | [.../apache/pulsar/client/impl/BatchMessageIdImpl.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0JhdGNoTWVzc2FnZUlkSW1wbC5qYXZh) | `67.50% <0.00%> (-4.73%)` | :arrow_down: |
   | [...va/org/apache/pulsar/client/impl/ProducerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1Byb2R1Y2VySW1wbC5qYXZh) | `15.66% <0.00%> (-1.34%)` | :arrow_down: |
   | [.../pulsar/client/impl/ProducerStatsRecorderImpl.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1Byb2R1Y2VyU3RhdHNSZWNvcmRlckltcGwuamF2YQ==) | `84.04% <0.00%> (-0.62%)` | :arrow_down: |
   | [...he/pulsar/client/impl/MultiTopicsConsumerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL011bHRpVG9waWNzQ29uc3VtZXJJbXBsLmphdmE=) | `22.78% <0.00%> (-0.09%)` | :arrow_down: |
   | [...va/org/apache/pulsar/client/impl/ConsumerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVySW1wbC5qYXZh) | `15.09% <0.00%> (-0.04%)` | :arrow_down: |
   | [...va/org/apache/pulsar/broker/service/ServerCnx.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1NlcnZlckNueC5qYXZh) | | |
   | [...pulsar/broker/loadbalance/ResourceDescription.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9sb2FkYmFsYW5jZS9SZXNvdXJjZURlc2NyaXB0aW9uLmphdmE=) | | |
   | [.../metadata/v2/TransactionBufferSnapshotIndexes.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvbWV0YWRhdGEvdjIvVHJhbnNhY3Rpb25CdWZmZXJTbmFwc2hvdEluZGV4ZXMuamF2YQ==) | | |
   | [...ava/org/apache/pulsar/broker/stats/TimeWindow.java](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zdGF0cy9UaW1lV2luZG93LmphdmE=) | | |
   | ... and [492 more](https://codecov.io/gh/apache/pulsar/pull/18877/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1052795762


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+            singleMessagePayload.release();
+            if (singleMessageMetadata.isCompactedOut()){
+                continue;

Review Comment:
   Hi @congbobo184 
   The constructor of batched message entry is `[single_msg_size_1, single_msg_content_1, single_msg_size_2, single_msg_content_2......]`, we can not extract the last internal message directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#issuecomment-1356975653

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1045353417


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        // Disable the scheduled task: compaction.
+        conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE);
+        // Disable the scheduled task: retention.
+        conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE);
+    }
+
+    private MessageIdImpl getLastMessageIdByTopic(String topicName) throws Exception{
+        return (MessageIdImpl) pulsar.getBrokerService().getTopic(topicName, false)
+                .get().get().getLastMessageId().get();
+    }
+
+    private void triggerCompactionAndWait(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        persistentTopic.triggerCompaction();
+        Awaitility.await().untilAsserted(() -> {
+            PositionImpl lastConfirmPos = (PositionImpl) persistentTopic.getManagedLedger().getLastConfirmedEntry();
+            PositionImpl markDeletePos = (PositionImpl) persistentTopic
+                    .getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition();
+            assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId());
+            assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId());
+        });
+    }
+
+    private void triggerLedgerSwitch(String topicName) throws Exception{
+        admin.topics().unload(topicName);
+        Awaitility.await().until(() -> {
+            CompletableFuture<Optional<Topic>> topicFuture =
+                    pulsar.getBrokerService().getTopic(topicName, false);
+            if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){
+                return false;
+            }
+            Optional<Topic> topicOptional = topicFuture.join();
+            if (!topicOptional.isPresent()){
+                return false;
+            }
+            PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+            return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened;
+        });
+    }
+
+    private void clearAllTheLedgersOutdated(String topicName) throws Exception {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            CompletableFuture future = new CompletableFuture();
+            managedLedger.trimConsumedLedgersInBackground(future);
+            future.join();
+            return managedLedger.getLedgersInfo().size() == 1;
+        });
+    }
+
+    @Test
+    public void testGetLastMessageIdWhenLedgerEmpty() throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .readCompacted(true)
+                .subscribe();
+        MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId();
+        assertEquals(messageId.getLedgerId(), -1);
+        assertEquals(messageId.getEntryId(), -1);
+
+        // cleanup.
+        consumer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test
+    public void testGetLastMessageIdWhenNoNonEmptyLedgerExists() throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        ReaderImpl<String> reader = (ReaderImpl<String>) pulsarClient.newReader(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(false)
+                .create();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        producer.newMessage().key("k0").value("v0").sendAsync().get();
+        reader.readNext();
+        triggerLedgerSwitch(topicName);
+        clearAllTheLedgersOutdated(topicName);
+
+        MessageIdImpl messageId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
+        assertEquals(messageId.getLedgerId(), -1);
+        assertEquals(messageId.getEntryId(), -1);
+
+        // cleanup.
+        reader.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @DataProvider(name = "enabledBatch")
+    public Object[][] enabledBatch(){
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "enabledBatch")
+    public void testGetLastMessageIdBeforeCompaction(boolean enabledBatch) throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .readCompacted(true)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(enabledBatch)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+        producer.flush();
+        sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+        producer.flush();
+        FutureUtil.waitForAll(sendFutures).join();
+
+        MessageIdImpl lastMessageIdExpected = getLastMessageIdByTopic(topicName);
+        MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
+        assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId());
+        assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId());
+        if (enabledBatch){
+            BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdExpected;
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
+            assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize());
+            assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex());
+        }
+
+        // cleanup.
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
+
+    @Test(dataProvider = "enabledBatch")
+    public void testGetLastMessageIdAfterCompaction(boolean enabledBatch) throws Exception {
+        String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .receiverQueueSize(1)
+                .readCompacted(true)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(enabledBatch)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+        producer.flush();
+        sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+        sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+        producer.flush();
+        FutureUtil.waitForAll(sendFutures).join();

Review Comment:
   I see. Just found `testGetLastMessageIdAfterCompactionAllNullMsg` only uses the last message id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18877: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18877:
URL: https://github.com/apache/pulsar/pull/18877#discussion_r1166226668


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2017,6 +2027,25 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo
         });
     }
 
+    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+        int batchSize = metadata.getNumMessagesInBatch();
+        if (batchSize <= 1){
+            return -1;
+        }
+        SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+        int lastBatchIndexInBatch = -1;
+        for (int i = 0; i < batchSize; i++){
+            ByteBuf singleMessagePayload =
+                    Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);

Review Comment:
   @heesung-sn 
   
   I think you are right. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org