You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:06 UTC

[pulsar] 20/38: Fix some empty message related problems in the compacted topic. (#6795)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0ca8e8a8848fbbe1040e10c175c299a5c712182
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Apr 23 15:43:16 2020 +0800

    Fix some empty message related problems in the compacted topic. (#6795)
    
    ### Motivation
    
    Fix some empty message related problems in the compacted topic.
    
    1. Fix message delete of a key for the batch message.
    2. Fix compaction for all empty messages in the topic. If all messages are empty, the compaction should delete all messages in the compacted topic. Without this fix, the compact task failure with NPE and the consumer can get all messages.
    3. Seek to the compaction horizon when the last compaction task deletes all messages from the compacted topic(all previous messages are deleted by empty message). Without this fix, the consumer will stuck because of no entries in the ledger that the compacted topic used.
    
    ### Verifying this change
    
    Add unit test for the changes
    (cherry picked from commit 53407fc598286690790727635bb5067a7ac108e7)
---
 .../pulsar/client/impl/RawBatchConverter.java      |  10 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  46 +--
 .../pulsar/compaction/TwoPhaseCompactor.java       |  19 +-
 .../apache/pulsar/client/impl/RawReaderTest.java   |  19 +-
 .../apache/pulsar/compaction/CompactionTest.java   | 340 +++++++++++++++++++++
 5 files changed, 397 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 8c21a73..4ad65af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.function.BiPredicate;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -52,7 +52,7 @@ public class RawBatchConverter {
         }
     }
 
-    public static List<ImmutablePair<MessageId,String>> extractIdsAndKeys(RawMessage msg)
+    public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSize(RawMessage msg)
             throws IOException {
         checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
 
@@ -66,7 +66,7 @@ public class RawBatchConverter {
         ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
         metadata.recycle();
 
-        List<ImmutablePair<MessageId,String>> idsAndKeys = new ArrayList<>();
+        List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = new ArrayList<>();
 
         for (int i = 0; i < batchSize; i++) {
             SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata.newBuilder();
@@ -78,13 +78,13 @@ public class RawBatchConverter {
                                                   msg.getMessageIdData().getPartition(),
                                                   i);
             if (!singleMessageMetadataBuilder.getCompactedOut()) {
-                idsAndKeys.add(ImmutablePair.of(id, singleMessageMetadataBuilder.getPartitionKey()));
+                idsAndKeysAndSize.add(ImmutableTriple.of(id, singleMessageMetadataBuilder.getPartitionKey(), singleMessageMetadataBuilder.getPayloadSize()));
             }
             singleMessageMetadataBuilder.recycle();
             singleMessagePayload.release();
         }
         uncompressedPayload.release();
-        return idsAndKeys;
+        return idsAndKeysAndSize;
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 22efe8e..2d430a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ComparisonChain;
 
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -88,28 +89,35 @@ public class CompactedTopicImpl implements CompactedTopic {
                 cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
             } else {
                 compactedTopicContext.thenCompose(
-                        (context) -> {
-                            return findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
-                                .thenCompose((startPoint) -> {
-                                        if (startPoint == NEWER_THAN_COMPACTED) {
-                                            cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
-                                            return CompletableFuture.completedFuture(null);
-                                        } else {
-                                            long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
-                                                                     startPoint + numberOfEntriesToRead);
-                                            return readEntries(context.ledger, startPoint, endPoint)
-                                                .thenAccept((entries) -> {
-                                                        Entry lastEntry = entries.get(entries.size() - 1);
-                                                        cursor.seek(lastEntry.getPosition().getNext());
-                                                        callback.readEntriesComplete(entries, ctx);
-                                                    });
-                                        }
+                    (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
+                        .thenCompose((startPoint) -> {
+                            if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) {
+                                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
+                                return CompletableFuture.completedFuture(null);
+                            } else {
+                                long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
+                                                         startPoint + numberOfEntriesToRead);
+                                if (startPoint == NEWER_THAN_COMPACTED) {
+                                    cursor.seek(compactionHorizon.getNext());
+                                    callback.readEntriesComplete(Collections.emptyList(), ctx);
+                                }
+                                return readEntries(context.ledger, startPoint, endPoint)
+                                    .thenAccept((entries) -> {
+                                        Entry lastEntry = entries.get(entries.size() - 1);
+                                        cursor.seek(lastEntry.getPosition().getNext());
+                                        callback.readEntriesComplete(entries, ctx);
                                     });
-                                })
+                            }
+                        }))
                     .exceptionally((exception) -> {
+                        if (exception.getCause() instanceof NoSuchElementException) {
+                            cursor.seek(compactionHorizon.getNext());
+                            callback.readEntriesComplete(Collections.emptyList(), ctx);
+                        } else {
                             callback.readEntriesFailed(new ManagedLedgerException(exception), ctx);
-                            return null;
-                        });
+                        }
+                        return null;
+                    });
             }
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index a275bb5..df7c79b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.MessageId;
@@ -128,8 +129,17 @@ public class TwoPhaseCompactor extends Compactor {
                         boolean deletedMessage = false;
                         if (RawBatchConverter.isReadableBatch(m)) {
                             try {
-                                RawBatchConverter.extractIdsAndKeys(m)
-                                    .forEach(e -> latestForKey.put(e.getRight(), e.getLeft()));
+                                for (ImmutableTriple<MessageId, String, Integer> e :
+                                        RawBatchConverter.extractIdsAndKeysAndSize(m)) {
+                                    if (e != null) {
+                                        if (e.getRight() > 0) {
+                                            latestForKey.put(e.getMiddle(), e.getLeft());
+                                        } else {
+                                            deletedMessage = true;
+                                            latestForKey.remove(e.getMiddle());
+                                        }
+                                    }
+                                }
                             } catch (IOException ioe) {
                                 log.info("Error decoding batch for message {}. Whole batch will be included in output",
                                          id, ioe);
@@ -149,7 +159,8 @@ public class TwoPhaseCompactor extends Compactor {
                         MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
                         MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
                         if (id.compareTo(lastMessageId) == 0) {
-                            loopPromise.complete(new PhaseOneResult(first, to, lastMessageId, latestForKey));
+                            loopPromise.complete(new PhaseOneResult(first == null ? id : first, to == null ? id : to,
+                                    lastMessageId, latestForKey));
                         } else {
                             phaseOneLoop(reader,
                                          Optional.ofNullable(first),
@@ -230,7 +241,7 @@ public class TwoPhaseCompactor extends Compactor {
                         if (RawBatchConverter.isReadableBatch(m)) {
                             try {
                                 messageToAdd = RawBatchConverter.rebatchMessage(
-                                        m, (key, subid) -> latestForKey.get(key).equals(subid));
+                                        m, (key, subid) -> subid.equals(latestForKey.get(key)));
                             } catch (IOException ioe) {
                                 log.info("Error decoding batch for message {}. Whole batch will be included in output",
                                         id, ioe);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 5ae4618..2bdd488 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
@@ -252,10 +253,10 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         while (true) {
             try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
                 Assert.assertTrue(RawBatchConverter.isReadableBatch(m));
-                List<ImmutablePair<MessageId, String>> batchKeys = RawBatchConverter.extractIdsAndKeys(m);
+                List<ImmutableTriple<MessageId, String, Integer>> batchKeys = RawBatchConverter.extractIdsAndKeysAndSize(m);
                 // Assert each key is unique
-                for (ImmutablePair<MessageId, String> pair : batchKeys) {
-                    String key = pair.right;
+                for (ImmutableTriple<MessageId, String, Integer> pair : batchKeys) {
+                    String key = pair.middle;
                     Assert.assertTrue(
                             keys.add(key),
                             "Received duplicated key '" + key + "' : already received keys = " + keys);
@@ -285,7 +286,7 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
 
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
         try (RawMessage m = reader.readNextAsync().get()) {
-            List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m);
+            List<ImmutableTriple<MessageId, String, Integer>> idsAndKeys = RawBatchConverter.extractIdsAndKeysAndSize(m);
 
             Assert.assertEquals(idsAndKeys.size(), 3);
 
@@ -294,9 +295,9 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
             Assert.assertTrue(idsAndKeys.get(1).getLeft().compareTo(idsAndKeys.get(2).getLeft()) < 0);
 
             // assert keys are as expected
-            Assert.assertEquals(idsAndKeys.get(0).getRight(), "key1");
-            Assert.assertEquals(idsAndKeys.get(1).getRight(), "key2");
-            Assert.assertEquals(idsAndKeys.get(2).getRight(), "key3");
+            Assert.assertEquals(idsAndKeys.get(0).getMiddle(), "key1");
+            Assert.assertEquals(idsAndKeys.get(1).getMiddle(), "key2");
+            Assert.assertEquals(idsAndKeys.get(2).getMiddle(), "key3");
         } finally {
             reader.closeAsync().get();
         }
@@ -321,9 +322,9 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
         try (RawMessage m1 = reader.readNextAsync().get()) {
             RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
-            List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2);
+            List<ImmutableTriple<MessageId, String, Integer>> idsAndKeys = RawBatchConverter.extractIdsAndKeysAndSize(m2);
             Assert.assertEquals(idsAndKeys.size(), 1);
-            Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
+            Assert.assertEquals(idsAndKeys.get(0).getMiddle(), "key2");
             m2.close();
             Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1);
         } finally {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 233246c..2505a6e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.compaction;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
@@ -37,6 +39,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -55,12 +60,15 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1308,4 +1316,336 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
+    public void testAllEmptyCompactionLedger(boolean batchEnabled) throws Exception {
+        final String topic = "persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString();
+
+        final int messages = 10;
+
+        // 1.create producer and publish message to the topic.
+        ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic);
+        if (!batchEnabled) {
+            builder.enableBatching(false);
+        } else {
+            builder.batchingMaxMessages(messages / 5);
+        }
+
+        Producer<byte[]> producer = builder.create();
+
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().keyBytes("1".getBytes()).value("".getBytes()).sendAsync());
+        }
+
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
+            Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(m);
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testBatchAndNonBatchWithoutEmptyPayload() throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = "persistent://my-property/use/my-ns/testBatchAndNonBatchWithoutEmptyPayload" + UUID.randomUUID().toString();
+
+        // 1.create producer and publish message to the topic.
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.DAYS)
+                .create();
+
+        final String k1 = "k1";
+        final String k2 = "k2";
+        producer.newMessage().key(k1).value("0".getBytes()).send();
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(7);
+        for (int i = 0; i < 2; i++) {
+            futures.add(producer.newMessage().key(k1).value((i + 1 + "").getBytes()).sendAsync());
+        }
+        producer.flush();
+        producer.newMessage().key(k1).value("3".getBytes()).send();
+        for (int i = 0; i < 2; i++) {
+            futures.add(producer.newMessage().key(k1).value((i + 4 + "").getBytes()).sendAsync());
+        }
+        producer.flush();
+
+        for (int i = 0; i < 3; i++) {
+            futures.add(producer.newMessage().key(k2).value((i + "").getBytes()).sendAsync());
+        }
+
+        producer.newMessage().key(k2).value("3".getBytes()).send();
+        producer.flush();
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
+            Message<byte[]> m1 = consumer.receive(2, TimeUnit.SECONDS);
+            Message<byte[]> m2 = consumer.receive(2, TimeUnit.SECONDS);
+            assertNotNull(m1);
+            assertNotNull(m2);
+            assertEquals(m1.getKey(), k1);
+            assertEquals(new String(m1.getValue()), "5");
+            assertEquals(m2.getKey(), k2);
+            assertEquals(new String(m2.getValue()), "3");
+            Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(none);
+        }
+    }
+    @Test(timeOut = 20000)
+    public void testBatchAndNonBatchWithEmptyPayload() throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = "persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" + UUID.randomUUID().toString();
+
+        // 1.create producer and publish message to the topic.
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.DAYS)
+                .create();
+
+        final String k1 = "k1";
+        final String k2 = "k2";
+        final String k3 = "k3";
+        producer.newMessage().key(k1).value("0".getBytes()).send();
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(7);
+        for (int i = 0; i < 2; i++) {
+            futures.add(producer.newMessage().key(k1).value((i + 1 + "").getBytes()).sendAsync());
+        }
+        producer.flush();
+        producer.newMessage().key(k1).value("3".getBytes()).send();
+        for (int i = 0; i < 2; i++) {
+            futures.add(producer.newMessage().key(k1).value((i + 4 + "").getBytes()).sendAsync());
+        }
+        producer.flush();
+
+        for (int i = 0; i < 3; i++) {
+            futures.add(producer.newMessage().key(k2).value((i + 10 + "").getBytes()).sendAsync());
+        }
+        producer.flush();
+
+        producer.newMessage().key(k2).value("".getBytes()).send();
+
+        producer.newMessage().key(k3).value("0".getBytes()).send();
+
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
+            Message<byte[]> m1 = consumer.receive();
+            Message<byte[]> m2 = consumer.receive();
+            assertNotNull(m1);
+            assertNotNull(m2);
+            assertEquals(m1.getKey(), k1);
+            assertEquals(m2.getKey(), k3);
+            assertEquals(new String(m1.getValue()), "5");
+            assertEquals(new String(m2.getValue()), "0");
+            Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(none);
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testBatchAndNonBatchEndOfEmptyPayload() throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = "persistent://my-property/use/my-ns/testBatchAndNonBatchWithEmptyPayload" + UUID.randomUUID().toString();
+
+        // 1.create producer and publish message to the topic.
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.DAYS)
+                .create();
+
+        final String k1 = "k1";
+        final String k2 = "k2";
+        producer.newMessage().key(k1).value("0".getBytes()).send();
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(7);
+        for (int i = 0; i < 2; i++) {
+            futures.add(producer.newMessage().key(k1).value((i + 1 + "").getBytes()).sendAsync());
+        }
+        producer.flush();
+        producer.newMessage().key(k1).value("3".getBytes()).send();
+        for (int i = 0; i < 2; i++) {
+            futures.add(producer.newMessage().key(k1).value((i + 4 + "").getBytes()).sendAsync());
+        }
+        producer.flush();
+
+        for (int i = 0; i < 3; i++) {
+            futures.add(producer.newMessage().key(k2).value((i + 10 + "").getBytes()).sendAsync());
+        }
+        producer.flush();
+
+        producer.newMessage().key(k2).value("".getBytes()).send();
+
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
+            Message<byte[]> m1 = consumer.receive();
+            assertNotNull(m1);
+            assertEquals(m1.getKey(), k1);
+            assertEquals(new String(m1.getValue()), "5");
+            Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(none);
+        }
+    }
+
+    @Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
+    public void testCompactMultipleTimesWithoutEmptyMessage(boolean batchEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = "persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID().toString();
+
+        final int messages = 10;
+        final String key = "1";
+
+        // 1.create producer and publish message to the topic.
+        ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic);
+        if (!batchEnabled) {
+            builder.enableBatching(false);
+        } else {
+            builder.batchingMaxMessages(messages / 5);
+        }
+
+        Producer<byte[]> producer = builder.create();
+
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value((i + "").getBytes()).sendAsync());
+        }
+
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        // 3. Send more ten messages
+        futures.clear();
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value((i + 10 + "").getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        // 4.compact again.
+        compactor.compact(topic).get();
+
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
+            Message<byte[]> m1 = consumer.receive();
+            assertNotNull(m1);
+            assertEquals(m1.getKey(), key);
+            assertEquals(new String(m1.getValue()), "19");
+            Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(none);
+        }
+    }
+
+    @Test(timeOut = 2000000, dataProvider = "lastDeletedBatching")
+    public void testReadUnCompacted(boolean batchEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
+        final String topic = "persistent://my-property/use/my-ns/testReadUnCompacted" + UUID.randomUUID().toString();
+
+        final int messages = 10;
+        final String key = "1";
+
+        // 1.create producer and publish message to the topic.
+        ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic);
+        if (!batchEnabled) {
+            builder.enableBatching(false);
+        } else {
+            builder.batchingMaxMessages(messages / 5);
+        }
+
+        Producer<byte[]> producer = builder.create();
+
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value((i + "").getBytes()).sendAsync());
+        }
+
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compactor.compact(topic).get();
+
+        // 3. Send more ten messages
+        futures.clear();
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value((i + 10 + "").getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(futures).get();
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub1")
+                .readCompacted(true)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe()) {
+            for (int i = 0; i < 11; i++) {
+                Message<byte[]> received = consumer.receive();
+                assertNotNull(received);
+                assertEquals(received.getKey(), key);
+                assertEquals(new String(received.getValue()), i + 9 + "");
+                consumer.acknowledge(received);
+            }
+            Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(none);
+        }
+
+        // 4.Send empty message to delete the key-value in the compacted topic.
+        producer.newMessage().key(key).value(("").getBytes()).send();
+
+        // 5.compact the topic.
+        compactor.compact(topic).get();
+
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub2")
+                .readCompacted(true)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe()) {
+            Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(none);
+        }
+
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(key).value((i + 20 + "").getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub3")
+                .readCompacted(true)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe()) {
+            for (int i = 0; i < 10; i++) {
+                Message<byte[]> received = consumer.receive();
+                assertNotNull(received);
+                assertEquals(received.getKey(), key);
+                assertEquals(new String(received.getValue()), i + 20 + "");
+                consumer.acknowledge(received);
+            }
+            Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS);
+            assertNull(none);
+        }
+    }
 }
\ No newline at end of file