You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/02/06 06:46:47 UTC
[pulsar] branch branch-2.10 updated: [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted (#18877)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 6e93fe44ce9 [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted (#18877)
6e93fe44ce9 is described below
commit 6e93fe44ce9d9fe74f5714bb76d592d8413f38f8
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Dec 20 10:37:27 2022 +0800
[fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted (#18877)
### Motivation
The method `consumer.getLastMessageId` will return the latest message which can be received.
- If disabled `read compacted`, will return the last confirmed position of `ManagedLedger`.
- If enabled `read compacted`, will return the latest message id which can be read from the compacted topic.
If we send a batch message like this:
```java
producer.newMessage().key("k1").value("v0").sendAsync(); // message-id is [3:1,-1:0]
producer.newMessage().key("k1").value("v1").sendAsync(); // message-id is [3:1,-1:1]
producer.newMessage().key("k1").value("v2").sendAsync(); // message-id is [3:1,-1:2]
producer.newMessage().key("k2").value("v0").sendAsync(); // message-id is [3:1,-1:3]
producer.newMessage().key("k2").value("v1").sendAsync(); // message-id is [3:1,-1:4]
producer.newMessage().key("k2").value(null).sendAsync(); // message-id is [3:1,-1:5]
producer.flush();
```
After the compaction task is done, the messages with key `k2` will be deleted by the compaction task. Then the latest message that can be received will be `[3:1:-1:2]`.
---
When we call `consumer.getLastMessageId`, the expected result is:
```
[3:1,-1:2]
```
---
But the actual result is:
```
[3:1,-1:5]
```
### Modifications
If enabled `read compacted` and the latest entry of the compacted topic is a batched message, extract the entry and calculate all internal messages, then return the latest message which is not marked `compacted out`.
(cherry picked from commit 83993ae91fa0bb845fe84a8ead15f6d9aa26069f)
---
.../apache/pulsar/broker/service/ServerCnx.java | 35 +-
.../compaction/GetLastMessageIdCompactedTest.java | 375 +++++++++++++++++++++
2 files changed, 407 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 33998cb6696..1f08db86aaf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -38,6 +38,7 @@ import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import io.prometheus.client.Gauge;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
@@ -128,6 +129,7 @@ import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
@@ -1911,9 +1913,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (entry != null) {
// in this case, all the data has been compacted, so return the last position
// in the compacted ledger to the client
- MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
- int bs = metadata.getNumMessagesInBatch();
- int largestBatchIndex = bs > 0 ? bs - 1 : -1;
+ ByteBuf payload = entry.getDataBuffer();
+ MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+ int largestBatchIndex;
+ try {
+ largestBatchIndex = calculateTheLastBatchIndexInBatch(metadata, payload);
+ } catch (IOException ioEx){
+ ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError,
+ "Failed to deserialize batched message from the last entry of the compacted Ledger: "
+ + ioEx.getMessage()));
+ return;
+ }
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
@@ -1936,6 +1946,25 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
});
}
+ private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
+ int batchSize = metadata.getNumMessagesInBatch();
+ if (batchSize <= 1){
+ return -1;
+ }
+ SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
+ int lastBatchIndexInBatch = -1;
+ for (int i = 0; i < batchSize; i++){
+ ByteBuf singleMessagePayload =
+ Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
+ singleMessagePayload.release();
+ if (singleMessageMetadata.isCompactedOut()){
+ continue;
+ }
+ lastBatchIndexInBatch = i;
+ }
+ return lastBatchIndexInBatch;
+ }
+
private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName,
NamespaceOperation operation) {
if (!service.isAuthorizationEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
new file mode 100644
index 00000000000..0be9fa40754
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ // Disable the scheduled task: compaction.
+ conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE);
+ // Disable the scheduled task: retention.
+ conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE);
+ }
+
+ private MessageIdImpl getLastMessageIdByTopic(String topicName) throws Exception{
+ return (MessageIdImpl) pulsar.getBrokerService().getTopic(topicName, false)
+ .get().get().getLastMessageId().get();
+ }
+
+ private void triggerCompactionAndWait(String topicName) throws Exception {
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+ persistentTopic.triggerCompaction();
+ Awaitility.await().untilAsserted(() -> {
+ PositionImpl lastConfirmPos = (PositionImpl) persistentTopic.getManagedLedger().getLastConfirmedEntry();
+ PositionImpl markDeletePos = (PositionImpl) persistentTopic
+ .getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition();
+ assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId());
+ assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId());
+ });
+ }
+
+ private void triggerLedgerSwitch(String topicName) throws Exception{
+ admin.topics().unload(topicName);
+ Awaitility.await().until(() -> {
+ CompletableFuture<Optional<Topic>> topicFuture =
+ pulsar.getBrokerService().getTopic(topicName, false);
+ if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){
+ return false;
+ }
+ Optional<Topic> topicOptional = topicFuture.join();
+ if (!topicOptional.isPresent()){
+ return false;
+ }
+ PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+ return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened;
+ });
+ }
+
+ private void clearAllTheLedgersOutdated(String topicName) throws Exception {
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ CompletableFuture<Void> future = new CompletableFuture();
+ managedLedger.trimConsumedLedgersInBackground(future);
+ future.join();
+ return managedLedger.getLedgersInfo().size() == 1;
+ });
+ }
+
+ @Test
+ public void testGetLastMessageIdWhenLedgerEmpty() throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Consumer<String> consumer = createConsumer(topicName, subName);
+ MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId();
+ assertEquals(messageId.getLedgerId(), -1);
+ assertEquals(messageId.getEntryId(), -1);
+
+ // cleanup.
+ consumer.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ private Producer<String> createProducer(boolean enabledBatch, String topicName) throws Exception {
+ ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(enabledBatch);
+ if (enabledBatch){
+ producerBuilder.batchingMaxBytes(Integer.MAX_VALUE)
+ .batchingMaxPublishDelay(3, TimeUnit.HOURS)
+ .batchingMaxBytes(Integer.MAX_VALUE);
+ }
+ return producerBuilder.create();
+ }
+
+ private Consumer<String> createConsumer(String topicName, String subName) throws Exception {
+ return pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .receiverQueueSize(1)
+ .readCompacted(true)
+ .subscribe();
+ }
+
+ @Test
+ public void testGetLastMessageIdWhenNoNonEmptyLedgerExists() throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ ReaderImpl<String> reader = (ReaderImpl<String>) pulsarClient.newReader(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .receiverQueueSize(1)
+ .startMessageId(MessageId.earliest)
+ .readCompacted(false)
+ .create();
+
+ Producer<String> producer = createProducer(false, topicName);
+
+ producer.newMessage().key("k0").value("v0").sendAsync().get();
+ reader.readNext();
+ triggerLedgerSwitch(topicName);
+ clearAllTheLedgersOutdated(topicName);
+
+ MessageIdImpl messageId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
+ assertEquals(messageId.getLedgerId(), -1);
+ assertEquals(messageId.getEntryId(), -1);
+
+ // cleanup.
+ reader.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @DataProvider(name = "enabledBatch")
+ public Object[][] enabledBatch(){
+ return new Object[][]{
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "enabledBatch")
+ public void testGetLastMessageIdBeforeCompaction(boolean enabledBatch) throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Consumer<String> consumer = createConsumer(topicName, subName);
+ Producer<String> producer = createProducer(enabledBatch, topicName);
+
+ List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+ sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+ sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+ producer.flush();
+ sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+ sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+ producer.flush();
+ FutureUtil.waitForAll(sendFutures).join();
+
+ MessageIdImpl lastMessageIdExpected = getLastMessageIdByTopic(topicName);
+ MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
+ assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId());
+ assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId());
+ if (enabledBatch){
+ BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdExpected;
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
+ assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize());
+ assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex());
+ }
+
+ // cleanup.
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @Test(dataProvider = "enabledBatch")
+ public void testGetLastMessageIdAfterCompaction(boolean enabledBatch) throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Consumer<String> consumer = createConsumer(topicName, subName);
+ Producer<String> producer = createProducer(enabledBatch, topicName);
+
+ List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+ sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+ sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+ producer.flush();
+ sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+ sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+ producer.flush();
+ FutureUtil.waitForAll(sendFutures).join();
+
+ triggerCompactionAndWait(topicName);
+
+ MessageIdImpl lastMessageIdByTopic = getLastMessageIdByTopic(topicName);
+ MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId();
+ assertEquals(messageId.getLedgerId(), lastMessageIdByTopic.getLedgerId());
+ assertEquals(messageId.getEntryId(), lastMessageIdByTopic.getEntryId());
+ if (enabledBatch){
+ BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdByTopic;
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+ assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize());
+ assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex());
+ }
+
+ // cleanup.
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @Test(dataProvider = "enabledBatch")
+ public void testGetLastMessageIdAfterCompactionEndWithNullMsg(boolean enabledBatch) throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Consumer<String> consumer = createConsumer(topicName, subName);
+ Producer<String> producer = createProducer(enabledBatch, topicName);
+
+ List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+ sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+ sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+ producer.flush();
+ sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+ sendFutures.add(producer.newMessage().key("k1").value(null).sendAsync());
+ sendFutures.add(producer.newMessage().key("k2").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k2").value("v1").sendAsync());
+ sendFutures.add(producer.newMessage().key("k2").value(null).sendAsync());
+ producer.flush();
+ FutureUtil.waitForAll(sendFutures).join();
+
+ triggerCompactionAndWait(topicName);
+
+ MessageIdImpl lastMessageIdExpected = (MessageIdImpl) sendFutures.get(2).get();
+ MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
+ assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId());
+ assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId());
+ if (enabledBatch){
+ BatchMessageIdImpl lastBatchMessageIdExpected = (BatchMessageIdImpl) lastMessageIdExpected;
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
+ assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdExpected.getBatchSize());
+ assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdExpected.getBatchIndex());
+ }
+
+ // cleanup.
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @Test(dataProvider = "enabledBatch")
+ public void testGetLastMessageIdAfterCompactionEndWithNullMsg2(boolean enabledBatch) throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Consumer<String> consumer = createConsumer(topicName, subName);
+ Producer<String> producer = createProducer(enabledBatch, topicName);
+
+ List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+ sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+ producer.flush();
+ sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+ sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+ sendFutures.add(producer.newMessage().key("k2").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k2").value("v1").sendAsync());
+ sendFutures.add(producer.newMessage().key("k2").value(null).sendAsync());
+ producer.flush();
+ FutureUtil.waitForAll(sendFutures).join();
+
+ triggerCompactionAndWait(topicName);
+
+ MessageIdImpl lastMessageIdExpected = (MessageIdImpl) sendFutures.get(4).get();
+ MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
+ assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId());
+ assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId());
+ if (enabledBatch){
+ BatchMessageIdImpl lastBatchMessageIdExpected = (BatchMessageIdImpl) lastMessageIdExpected;
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId;
+ assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdExpected.getBatchSize());
+ assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdExpected.getBatchIndex());
+ }
+
+ // cleanup.
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+
+ @Test(dataProvider = "enabledBatch")
+ public void testGetLastMessageIdAfterCompactionAllNullMsg(boolean enabledBatch) throws Exception {
+ String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Consumer<String> consumer = createConsumer(topicName, subName);
+ Producer<String> producer = createProducer(enabledBatch, topicName);
+
+ List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+ sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k0").value(null).sendAsync());
+ producer.flush();
+ sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k1").value(null).sendAsync());
+ sendFutures.add(producer.newMessage().key("k2").value("v0").sendAsync());
+ sendFutures.add(producer.newMessage().key("k2").value(null).sendAsync());
+ producer.flush();
+ FutureUtil.waitForAll(sendFutures).join();
+
+ triggerCompactionAndWait(topicName);
+
+ MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId();
+ assertFalse(lastMessageId instanceof BatchMessageIdImpl);
+ assertEquals(lastMessageId.getLedgerId(), -1);
+ assertEquals(lastMessageId.getEntryId(), -1);
+
+ // cleanup.
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+}