You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/21 01:29:43 UTC

[pulsar] branch master updated: [improve][broker][client] PIP-215 Added TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView. (#18195)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 05e6f5e3c71 [improve][broker][client] PIP-215 Added TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView. (#18195)
05e6f5e3c71 is described below

commit 05e6f5e3c71426ee30a8ad1852456f7be897bdf3
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Dec 20 17:29:34 2022 -0800

    [improve][broker][client] PIP-215 Added TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView. (#18195)
---
 pulsar-broker/pom.xml                              |   1 -
 .../pulsar/client/impl/CompactionReaderImpl.java   |  95 +++++
 .../client/impl/RawBatchMessageContainerImpl.java  | 171 ++++++++
 .../org/apache/pulsar/compaction/Compactor.java    |   4 +-
 .../compaction/StrategicTwoPhaseCompactor.java     | 434 +++++++++++++++++++++
 .../pulsar/compaction/TwoPhaseCompactor.java       |   8 +-
 .../client/impl/CompactionReaderImplTest.java      | 111 ++++++
 .../client/impl/ProducerMemoryLimitTest.java       |   8 +
 .../pulsar/client/impl/ProducerSemaphoreTest.java  |   8 +
 .../impl/RawBatchMessageContainerImplTest.java     | 283 ++++++++++++++
 .../apache/pulsar/client/impl/TableViewTest.java   |  48 +++
 .../pulsar/compaction/CompactionRetentionTest.java |  24 +-
 .../apache/pulsar/compaction/CompactionTest.java   | 263 +++++++------
 .../apache/pulsar/compaction/CompactorTest.java    |  35 +-
 .../compaction/NumericOrderCompactionStrategy.java |  38 ++
 .../StrategicCompactionRetentionTest.java          |  45 +++
 .../pulsar/compaction/StrategicCompactionTest.java | 152 ++++++++
 .../pulsar/compaction/StrategicCompactorTest.java  |  49 +++
 .../compaction/TopicCompactionStrategyTest.java    |  54 +++
 .../org/apache/pulsar/client/api/TableView.java    |  10 +-
 .../client/impl/BatchMessageContainerImpl.java     |  33 +-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   4 +-
 .../client/impl/TableViewConfigurationData.java    |   2 +
 .../apache/pulsar/client/impl/TableViewImpl.java   |  70 +++-
 .../client/impl/conf/ReaderConfigurationData.java  |   6 +
 .../common/topics/TopicCompactionStrategy.java     |  76 ++++
 26 files changed, 1857 insertions(+), 175 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 136bd670d5e..672c11da03e 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -182,7 +182,6 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-messagecrypto-bc</artifactId>
       <version>${project.version}</version>
-      <scope>test</scope>
     </dependency>
 
     <dependency>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java
new file mode 100644
index 00000000000..5961cf0fae7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.CommandAck;
+
+/**
+ *  An extended ReaderImpl used for StrategicTwoPhaseCompactor.
+ *  The compaction consumer subscription is durable and consumes compacted messages from the earliest position.
+ *  It does not acknowledge the message after each read. (needs to call acknowledgeCumulativeAsync to ack messages.)
+ */
+@Slf4j
+public class CompactionReaderImpl<T> extends ReaderImpl<T> {
+
+    ConsumerBase<T> consumer;
+
+    ReaderConfigurationData<T> readerConfiguration;
+    private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration,
+                                 ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture,
+                                 Schema<T> schema) {
+        super(client, readerConfiguration, executorProvider, consumerFuture, schema);
+        this.readerConfiguration = readerConfiguration;
+        this.consumer = getConsumer();
+    }
+
+    public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic,
+                                                     CompletableFuture<Consumer<T>> consumerFuture,
+                                                     CryptoKeyReader cryptoKeyReader) {
+        ReaderConfigurationData<T> conf = new ReaderConfigurationData<>();
+        conf.setTopicName(topic);
+        conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+        conf.setStartMessageId(MessageId.earliest);
+        conf.setStartMessageFromRollbackDurationInSec(0);
+        conf.setReadCompacted(true);
+        conf.setSubscriptionMode(SubscriptionMode.Durable);
+        conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+        conf.setCryptoKeyReader(cryptoKeyReader);
+        return new CompactionReaderImpl<>(client, conf, client.externalExecutorProvider(), consumerFuture, schema);
+    }
+
+
+    @Override
+    public Message<T> readNext() throws PulsarClientException {
+        return consumer.receive();
+    }
+
+    @Override
+    public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException {
+        return consumer.receive(timeout, unit);
+    }
+
+    @Override
+    public CompletableFuture<Message<T>> readNextAsync() {
+        return consumer.receiveAsync();
+    }
+
+    public CompletableFuture<MessageId> getLastMessageIdAsync() {
+        return consumer.getLastMessageIdAsync();
+    }
+
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
+        return consumer.doAcknowledge(messageId, CommandAck.AckType.Cumulative, properties, null);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
new file mode 100644
index 00000000000..cf6b213155c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodecNone;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
+
+/**
+ * A raw batch message container without producer. (Used for StrategicTwoPhaseCompactor)
+ *
+ * incoming single messages:
+ * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
+ *
+ * batched into single batch message:
+ * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
+ */
+public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl {
+    MessageCrypto msgCrypto;
+    Set<String> encryptionKeys;
+    CryptoKeyReader cryptoKeyReader;
+    public RawBatchMessageContainerImpl(int maxNumMessagesInBatch) {
+        super();
+        compressionType = CompressionType.NONE;
+        compressor = new CompressionCodecNone();
+        if (maxNumMessagesInBatch > 0) {
+            this.maxNumMessagesInBatch = maxNumMessagesInBatch;
+        }
+    }
+    private ByteBuf encrypt(ByteBuf compressedPayload) {
+        if (msgCrypto == null) {
+            return compressedPayload;
+        }
+        int maxSize = msgCrypto.getMaxOutputSize(compressedPayload.readableBytes());
+        ByteBuf encryptedPayload = allocator.buffer(maxSize);
+        ByteBuffer targetBuffer = encryptedPayload.nioBuffer(0, maxSize);
+
+        try {
+            msgCrypto.encrypt(encryptionKeys, cryptoKeyReader, () -> messageMetadata,
+                    compressedPayload.nioBuffer(), targetBuffer);
+        } catch (PulsarClientException e) {
+            encryptedPayload.release();
+            compressedPayload.release();
+            discard(e);
+            throw new RuntimeException("Failed to encrypt payload", e);
+        }
+        encryptedPayload.writerIndex(targetBuffer.remaining());
+        compressedPayload.release();
+        return encryptedPayload;
+    }
+
+    @Override
+    public ProducerImpl.OpSendMsg createOpSendMsg() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Sets a CryptoKeyReader instance to encrypt batched messages during serialization, `toByteBuf()`.
+     * @param cryptoKeyReader a CryptoKeyReader instance
+     */
+    public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+        this.cryptoKeyReader = cryptoKeyReader;
+    }
+
+    /**
+     * Serializes the batched messages and return the ByteBuf.
+     * It sets the CompressionType and Encryption Keys from the batched messages.
+     * If successful, it calls `clear()` at the end to release buffers from this container.
+     *
+     * The returned byte buffer follows this format:
+     * [IdSize][Id][metadataAndPayloadSize][metadataAndPayload].
+     * This format is the same as RawMessage.serialize()'s format
+     * as the compacted messages is deserialized as RawMessage in broker.
+     *
+     * It throws the following runtime exceptions from encryption:
+     * IllegalStateException if cryptoKeyReader is not set for encrypted messages.
+     * IllegalArgumentException if encryption key init fails.
+     * RuntimeException if message encryption fails.
+     *
+     * @return a ByteBuf instance
+     */
+    public ByteBuf toByteBuf() {
+        if (numMessagesInBatch > 1) {
+            messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+            messageMetadata.setSequenceId(lowestSequenceId);
+            messageMetadata.setHighestSequenceId(highestSequenceId);
+        }
+        MessageImpl lastMessage = messages.get(messages.size() - 1);
+        MessageIdImpl lastMessageId = (MessageIdImpl) lastMessage.getMessageId();
+        MessageMetadata lastMessageMetadata = lastMessage.getMessageBuilder();
+
+        this.compressionType = lastMessageMetadata.getCompression();
+        this.compressor = CompressionCodecProvider.getCompressionCodec(lastMessageMetadata.getCompression());
+
+        if (!lastMessage.getEncryptionCtx().isEmpty()) {
+            EncryptionContext encryptionContext = (EncryptionContext) lastMessage.getEncryptionCtx().get();
+
+            if (cryptoKeyReader == null) {
+                IllegalStateException ex =
+                        new IllegalStateException("Messages are encrypted but no cryptoKeyReader is provided.");
+                discard(ex);
+                throw ex;
+            }
+
+            encryptionKeys = encryptionContext.getKeys().keySet();
+            if (msgCrypto == null) {
+                msgCrypto =
+                        new MessageCryptoBc(String.format(
+                                "[%s] [%s]", topicName, "RawBatchMessageContainer"), true);
+                try {
+                    msgCrypto.addPublicKeyCipher(encryptionKeys, cryptoKeyReader);
+                } catch (PulsarClientException.CryptoException e) {
+                    discard(e);
+                    throw new IllegalArgumentException("Failed to set encryption keys", e);
+                }
+            }
+        }
+
+        ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload());
+        updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
+        ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
+                messageMetadata, encryptedPayload);
+
+        MessageIdData idData = new MessageIdData();
+        idData.setLedgerId(lastMessageId.getLedgerId());
+        idData.setEntryId(lastMessageId.getEntryId());
+        idData.setPartition(lastMessageId.getPartitionIndex());
+
+        // Format: [IdSize][Id][metadataAndPayloadSize][metadataAndPayload]
+        // Following RawMessage.serialize() format as the compacted messages will be parsed as RawMessage in broker
+        int idSize = idData.getSerializedSize();
+        int headerSize = 4 /* IdSize */ + idSize + 4 /* metadataAndPayloadSize */;
+        int totalSize = headerSize + metadataAndPayload.readableBytes();
+        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(totalSize);
+        buf.writeInt(idSize);
+        idData.writeTo(buf);
+        buf.writeInt(metadataAndPayload.readableBytes());
+        buf.writeBytes(metadataAndPayload);
+        encryptedPayload.release();
+        clear();
+        return buf;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
index 522618f9d6a..e93a642c76e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
@@ -40,8 +40,8 @@ public abstract class Compactor {
 
     protected final ServiceConfiguration conf;
     protected final ScheduledExecutorService scheduler;
-    private final PulsarClient pulsar;
-    private final BookKeeper bk;
+    protected final PulsarClient pulsar;
+    protected final BookKeeper bk;
     protected final CompactorMXBeanImpl mxBean;
 
     public Compactor(ServiceConfiguration conf,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
new file mode 100644
index 00000000000..bb0850efab4
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * <p>As the first pass caches the entire message(not just offset) for each key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+    private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+    private static final int MAX_OUTSTANDING = 500;
+    private final Duration phaseOneLoopReadTimeout;
+    private final RawBatchMessageContainerImpl batchMessageContainer;
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler,
+                                      int maxNumMessagesInBatch) {
+        super(conf, pulsar, bk, scheduler);
+        batchMessageContainer = new RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+        phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+    }
+
+    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler) {
+        this(conf, pulsar, bk, scheduler, -1);
+    }
+
+    public CompletableFuture<Long> compact(String topic) {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> strategy) {
+        return compact(topic, strategy, null);
+    }
+
+    public <T> CompletableFuture<Long> compact(String topic,
+                                               TopicCompactionStrategy<T> strategy,
+                                               CryptoKeyReader cryptoKeyReader) {
+        CompletableFuture<Consumer<T>> consumerFuture = new CompletableFuture<>();
+        if (cryptoKeyReader != null) {
+            batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+        }
+        CompactionReaderImpl reader = CompactionReaderImpl.create(
+                (PulsarClientImpl) pulsar, strategy.getSchema(), topic, consumerFuture, cryptoKeyReader);
+
+        return consumerFuture.thenComposeAsync(__ -> compactAndCloseReader(reader, strategy), scheduler);
+    }
+
+    <T> CompletableFuture<Long> doCompaction(Reader<T> reader, TopicCompactionStrategy strategy) {
+
+        if (!(reader instanceof CompactionReaderImpl<T>)) {
+            return CompletableFuture.failedFuture(
+                    new IllegalStateException("reader has to be DelayedAckReaderImpl"));
+        }
+        return reader.hasMessageAvailableAsync()
+                .thenCompose(available -> {
+                    if (available) {
+                        return phaseOne(reader, strategy)
+                                .thenCompose((result) -> phaseTwo(result, reader, bk));
+                    } else {
+                        log.info("Skip compaction of the empty topic {}", reader.getTopic());
+                        return CompletableFuture.completedFuture(-1L);
+                    }
+                });
+    }
+
+    <T> CompletableFuture<Long> compactAndCloseReader(Reader<T> reader, TopicCompactionStrategy strategy) {
+        CompletableFuture<Long> promise = new CompletableFuture<>();
+        mxBean.addCompactionStartOp(reader.getTopic());
+        doCompaction(reader, strategy).whenComplete(
+                (ledgerId, exception) -> {
+                    log.info("Completed doCompaction ledgerId:{}", ledgerId);
+                    reader.closeAsync().whenComplete((v, exception2) -> {
+                        if (exception2 != null) {
+                            log.warn("Error closing reader handle {}, ignoring", reader, exception2);
+                        }
+                        if (exception != null) {
+                            // complete with original exception
+                            mxBean.addCompactionEndOp(reader.getTopic(), false);
+                            promise.completeExceptionally(exception);
+                        } else {
+                            mxBean.addCompactionEndOp(reader.getTopic(), true);
+                            promise.complete(ledgerId);
+                        }
+                    });
+                });
+        return promise;
+    }
+
+    private <T> boolean doCompactMessage(
+            Message<T> msg, PhaseOneResult<T> result, TopicCompactionStrategy<T> strategy) {
+        Map<String, Message<T>> cache = result.cache;
+        String key = msg.getKey();
+
+        if (key == null) {
+            msg.release();
+            return true;
+        }
+        T val = msg.getValue();
+        Message<T> prev = cache.get(key);
+        T prevVal = prev == null ? null : prev.getValue();
+
+        if (!strategy.shouldKeepLeft(prevVal, val)) {
+            if (val != null && msg.size() > 0) {
+                cache.remove(key); // to reorder
+                cache.put(key, msg);
+            } else {
+                cache.remove(key);
+                msg.release();
+            }
+
+            if (prev != null) {
+                prev.release();
+            }
+
+            result.validCompactionCount.incrementAndGet();
+            return true;
+        } else {
+            msg.release();
+            result.invalidCompactionCount.incrementAndGet();
+            return false;
+        }
+
+    }
+
+    private static class PhaseOneResult<T> {
+        MessageId firstId;
+        MessageId lastId; // last read messageId
+        Map<String, Message<T>> cache;
+
+        AtomicInteger invalidCompactionCount;
+
+        AtomicInteger validCompactionCount;
+
+        AtomicInteger numReadMessages;
+
+        String topic;
+
+        PhaseOneResult(String topic) {
+            this.topic = topic;
+            cache = new LinkedHashMap<>();
+            invalidCompactionCount = new AtomicInteger();
+            validCompactionCount = new AtomicInteger();
+            numReadMessages = new AtomicInteger();
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "{Topic:%s, firstId:%s, lastId:%s, cache.size:%d, "
+                            + "invalidCompactionCount:%d, validCompactionCount:%d, numReadMessages:%d}",
+                    topic,
+                    firstId != null ? firstId.toString() : "",
+                    lastId != null ? lastId.toString() : "",
+                    cache.size(),
+                    invalidCompactionCount.get(),
+                    validCompactionCount.get(),
+                    numReadMessages.get());
+        }
+    }
+
+
+    private <T> CompletableFuture<PhaseOneResult> phaseOne(Reader<T> reader, TopicCompactionStrategy strategy) {
+        CompletableFuture<PhaseOneResult> promise = new CompletableFuture<>();
+        PhaseOneResult<T> result = new PhaseOneResult(reader.getTopic());
+
+        ((CompactionReaderImpl<T>) reader).getLastMessageIdAsync()
+                .thenAccept(lastMessageId -> {
+                    log.info("Commencing phase one of compaction for {}, reading to {}",
+                            reader.getTopic(), lastMessageId);
+                    result.lastId = copyMessageId(lastMessageId);
+                    phaseOneLoop(reader, promise, result, strategy);
+                }).exceptionally(ex -> {
+                    promise.completeExceptionally(ex);
+                    return null;
+                });
+
+        return promise;
+
+    }
+
+    private static MessageId copyMessageId(MessageId msgId) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl tempId = (BatchMessageIdImpl) msgId;
+            return new BatchMessageIdImpl(tempId);
+        } else if (msgId instanceof MessageIdImpl) {
+            MessageIdImpl tempId = (MessageIdImpl) msgId;
+            return new MessageIdImpl(tempId.getLedgerId(), tempId.getEntryId(),
+                    tempId.getPartitionIndex());
+        } else {
+            throw new IllegalStateException("Unknown lastMessageId type");
+        }
+    }
+
+    private <T> void phaseOneLoop(Reader<T> reader, CompletableFuture<PhaseOneResult> promise,
+                                  PhaseOneResult<T> result, TopicCompactionStrategy<T> strategy) {
+
+        if (promise.isDone()) {
+            return;
+        }
+
+        CompletableFuture<Message<T>> future = reader.readNextAsync();
+        FutureUtil.addTimeoutHandling(future,
+                phaseOneLoopReadTimeout, scheduler,
+                () -> FutureUtil.createTimeoutException("Timeout", getClass(),
+                        "phaseOneLoop(...)"));
+
+        future.thenAcceptAsync(msg -> {
+
+            MessageId id = msg.getMessageId();
+            boolean completed = false;
+            if (result.lastId.compareTo(id) == 0) {
+                completed = true;
+            }
+
+            result.numReadMessages.incrementAndGet();
+            mxBean.addCompactionReadOp(reader.getTopic(), msg.size());
+            if (doCompactMessage(msg, result, strategy)) {
+                mxBean.addCompactionRemovedEvent(reader.getTopic());
+            }
+            //set ids in the result
+            if (result.firstId == null) {
+                result.firstId = copyMessageId(id);
+                log.info("Resetting cursor to firstId:{}", result.firstId);
+                try {
+                    reader.seek(result.firstId);
+                } catch (PulsarClientException e) {
+                    throw new RuntimeException("Failed to reset the cursor to firstId:" + result.firstId, e);
+                }
+            }
+            if (completed) {
+                promise.complete(result);
+            } else {
+                phaseOneLoop(reader, promise, result, strategy);
+            }
+
+        }, scheduler).exceptionally(ex -> {
+            promise.completeExceptionally(ex);
+            return null;
+        });
+
+    }
+
+    private <T> CompletableFuture<Long> phaseTwo(PhaseOneResult<T> phaseOneResult, Reader<T> reader, BookKeeper bk) {
+        log.info("Completed phase one. Result:{}. ", phaseOneResult);
+        Map<String, byte[]> metadata =
+                LedgerMetadataUtils.buildMetadataForCompactedLedger(
+                        phaseOneResult.topic, phaseOneResult.lastId.toByteArray());
+        return createLedger(bk, metadata)
+                .thenCompose((ledger) -> {
+                    log.info(
+                            "Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
+                            phaseOneResult.topic, phaseOneResult.firstId, phaseOneResult.lastId,
+                            phaseOneResult.cache.size(), ledger.getId());
+                    return runPhaseTwo(phaseOneResult, reader, ledger, bk);
+                });
+    }
+
+    private <T> CompletableFuture<Long> runPhaseTwo(
+            PhaseOneResult<T> phaseOneResult, Reader<T> reader, LedgerHandle ledger, BookKeeper bk) {
+        CompletableFuture<Long> promise = new CompletableFuture<>();
+        Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+        CompletableFuture<Void> loopPromise = new CompletableFuture<>();
+        phaseTwoLoop(phaseOneResult.topic, phaseOneResult.cache.values().iterator(), ledger,
+                outstanding, loopPromise);
+        loopPromise.thenCompose((v) -> {
+                    log.info("Flushing batch container numMessagesInBatch:{}",
+                            batchMessageContainer.getNumMessagesInBatch());
+                    return addToCompactedLedger(ledger, null, reader.getTopic(), outstanding)
+                            .whenComplete((res, exception2) -> {
+                                if (exception2 != null) {
+                                    promise.completeExceptionally(exception2);
+                                    return;
+                                }
+                            });
+                })
+                .thenCompose(v -> {
+                    log.info("Acking ledger id {}", phaseOneResult.firstId);
+                    return ((CompactionReaderImpl<T>) reader)
+                            .acknowledgeCumulativeAsync(
+                                    phaseOneResult.lastId, Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
+                                            ledger.getId()));
+                })
+                .thenCompose((v) -> closeLedger(ledger))
+                .whenComplete((v, exception) -> {
+                    if (exception != null) {
+                        deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
+                            if (exception2 != null) {
+                                log.error("Cleanup of ledger {} for failed", ledger, exception2);
+                            }
+                            // complete with original exception
+                            promise.completeExceptionally(exception);
+                        });
+                    } else {
+                        log.info("kept ledger:{}", ledger.getId());
+                        promise.complete(ledger.getId());
+                    }
+                });
+
+        return promise;
+    }
+
+    private <T> void phaseTwoLoop(String topic, Iterator<Message<T>> reader,
+                                  LedgerHandle lh, Semaphore outstanding,
+                                  CompletableFuture<Void> promise) {
+        if (promise.isDone()) {
+            return;
+        }
+        CompletableFuture.runAsync(() -> {
+                    if (reader.hasNext()) {
+                        Message<T> message = reader.next();
+                        mxBean.addCompactionReadOp(topic, message.size());
+                        addToCompactedLedger(lh, message, topic, outstanding)
+                                .whenComplete((res, exception2) -> {
+                                    if (exception2 != null) {
+                                        promise.completeExceptionally(exception2);
+                                        return;
+                                    }
+                                });
+                        phaseTwoLoop(topic, reader, lh, outstanding, promise);
+                    } else {
+                        try {
+                            outstanding.acquire(MAX_OUTSTANDING);
+                        } catch (InterruptedException e) {
+                            promise.completeExceptionally(e);
+                            return;
+                        }
+                        promise.complete(null);
+                        return;
+                    }
+
+                }, scheduler)
+                .exceptionally(ex -> {
+                    promise.completeExceptionally(ex);
+                    return null;
+                });
+    }
+
+    <T> CompletableFuture<Boolean> addToCompactedLedger(
+            LedgerHandle lh, Message<T> m, String topic, Semaphore outstanding) {
+        CompletableFuture<Boolean> bkf = new CompletableFuture<>();
+        if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
+            if (batchMessageContainer.getNumMessagesInBatch() > 0) {
+                try {
+                    ByteBuf serialized = batchMessageContainer.toByteBuf();
+                    outstanding.acquire();
+                    mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
+                    long start = System.nanoTime();
+                    lh.asyncAddEntry(serialized,
+                            (rc, ledger, eid, ctx) -> {
+                                outstanding.release();
+                                mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
+                                if (rc != BKException.Code.OK) {
+                                    bkf.completeExceptionally(BKException.create(rc));
+                                } else {
+                                    bkf.complete(true);
+                                }
+                            }, null);
+
+                } catch (Throwable t) {
+                    log.error("Failed to add entry", t);
+                    batchMessageContainer.discard((Exception) t);
+                    return FutureUtil.failedFuture(t);
+                }
+            } else {
+                bkf.complete(false);
+            }
+        } else {
+            bkf.complete(false);
+        }
+        return bkf;
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 84107190fd3..821dd9c0c9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
 public class TwoPhaseCompactor extends Compactor {
     private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
-    private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
+    protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
     private final Duration phaseOneLoopReadTimeout;
 
     public TwoPhaseCompactor(ServiceConfiguration conf,
@@ -309,7 +309,7 @@ public class TwoPhaseCompactor extends Compactor {
         });
     }
 
-    private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String, byte[]> metadata) {
+    protected CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String, byte[]> metadata) {
         CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
 
         try {
@@ -332,7 +332,7 @@ public class TwoPhaseCompactor extends Compactor {
         return bkf;
     }
 
-    private CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle lh) {
+    protected CompletableFuture<Void> deleteLedger(BookKeeper bk, LedgerHandle lh) {
         CompletableFuture<Void> bkf = new CompletableFuture<>();
         try {
             bk.asyncDeleteLedger(lh.getId(),
@@ -349,7 +349,7 @@ public class TwoPhaseCompactor extends Compactor {
         return bkf;
     }
 
-    private CompletableFuture<Void> closeLedger(LedgerHandle lh) {
+    protected CompletableFuture<Void> closeLedger(LedgerHandle lh) {
         CompletableFuture<Void> bkf = new CompletableFuture<>();
         try {
             lh.asyncClose((rc, ledger, ctx) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java
new file mode 100644
index 00000000000..6a490c918e7
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class CompactionReaderImplTest extends MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+        admin.clusters().createCluster("test",
+                ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("my-property",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        String topic = "persistent://my-property/my-ns/my-compact-topic";
+        int numKeys = 5;
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        for (int i = 0; i < numKeys; i++) {
+            producer.newMessage().key("key:" + i).value("value" + i).send();
+        }
+
+        @Cleanup
+        CompactionReaderImpl<String> reader = CompactionReaderImpl
+                .create((PulsarClientImpl) pulsarClient, Schema.STRING, topic, new CompletableFuture(), null);
+
+        ConsumerBase consumerBase = spy(reader.getConsumer());
+        FieldUtils.writeDeclaredField(
+                reader, "consumer", consumerBase, true);
+
+        ReaderConfigurationData readerConfigurationData =
+                (ReaderConfigurationData) FieldUtils.readDeclaredField(
+                        reader, "readerConfiguration", true);
+
+
+        ReaderConfigurationData expected = new ReaderConfigurationData<>();
+        expected.setTopicName(topic);
+        expected.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+        expected.setStartMessageId(MessageId.earliest);
+        expected.setStartMessageFromRollbackDurationInSec(0);
+        expected.setReadCompacted(true);
+        expected.setSubscriptionMode(SubscriptionMode.Durable);
+        expected.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+
+        MessageIdImpl lastMessageId = (MessageIdImpl) reader.getLastMessageIdAsync().get();
+        MessageIdImpl id = null;
+        MessageImpl m = null;
+
+        Assert.assertEquals(readerConfigurationData, expected);
+        for (int i = 0; i < numKeys; i++) {
+            m = (MessageImpl) reader.readNextAsync().get();
+            id = (MessageIdImpl) m.getMessageId();
+        }
+        Assert.assertEquals(id, lastMessageId);
+        verify(consumerBase, times(0))
+                .acknowledgeCumulativeAsync(Mockito.any(MessageId.class));
+
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index e8e4355346e..3ec784e248c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -152,6 +152,14 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase {
             }).when(mockAllocator).buffer(anyInt());
 
             final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
+            /* Without `batchMessageContainer.setProducer(producer);` it throws NPE since producer is null, and
+                eventually sendAsync() catches this NPE and releases the memory and semaphore.
+                } catch (Throwable t) {
+                    completeCallbackAndReleaseSemaphore(uncompressedSize, callback,
+                            new PulsarClientException(t, msg.getSequenceId()));
+                }
+            */
+            batchMessageContainer.setProducer(producer);
             Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
             batchMessageContainerField.setAccessible(true);
             batchMessageContainerField.set(spyProducer, batchMessageContainer);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 1d4f8c9b674..2f8cb655401 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -305,6 +305,14 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
             Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
             batchMessageContainerField.setAccessible(true);
             batchMessageContainerField.set(spyProducer, batchMessageContainer);
+            /* Without `batchMessageContainer.setProducer(producer);` it throws NPE since producer is null, and
+                eventually sendAsync() catches this NPE and releases the memory and semaphore.
+                } catch (Throwable t) {
+                    completeCallbackAndReleaseSemaphore(uncompressedSize, callback,
+                            new PulsarClientException(t, msg.getSequenceId()));
+                }
+            */
+            batchMessageContainer.setProducer(producer);
 
             spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
             Assert.fail("can not reach here");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
new file mode 100644
index 00000000000..9fa834a166c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+
+import static org.apache.pulsar.common.api.proto.CompressionType.LZ4;
+import static org.apache.pulsar.common.api.proto.CompressionType.NONE;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.compaction.CompactionTest;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class RawBatchMessageContainerImplTest {
+    CompressionType compressionType;
+    MessageCrypto msgCrypto;
+    CryptoKeyReader cryptoKeyReader;
+    Map<String, EncryptionContext.EncryptionKey> encryptKeys;
+
+    public void setEncryptionAndCompression(boolean encrypt, boolean compress) {
+        if (compress) {
+            compressionType = LZ4;
+        } else {
+            compressionType = NONE;
+        }
+
+        if (encrypt) {
+            cryptoKeyReader = new CompactionTest.EncKeyReader();
+            msgCrypto = new MessageCryptoBc("test", false);
+            String key = "client-ecdsa.pem";
+            EncryptionKeyInfo publicKeyInfo = cryptoKeyReader.getPublicKey(key, null);
+            encryptKeys = Map.of(
+                    key, new EncryptionContext.EncryptionKey(publicKeyInfo.getKey(), publicKeyInfo.getMetadata()));
+        } else {
+            msgCrypto = null;
+            cryptoKeyReader = null;
+            encryptKeys = null;
+        }
+    }
+
+    public MessageImpl createMessage(String topic, String value, int entryId) {
+        MessageMetadata metadata = new MessageMetadata()
+                .setPublishTime(System.currentTimeMillis())
+                .setProducerName("test")
+                .setSequenceId(entryId);
+
+        MessageIdImpl id = new MessageIdImpl(0, entryId, -1);
+
+        if (compressionType != null) {
+            metadata.setCompression(compressionType);
+        }
+        Optional<EncryptionContext> encryptionContext = null;
+        if(encryptKeys != null) {
+            EncryptionContext tmp = new EncryptionContext();
+            tmp.setKeys(encryptKeys);
+            encryptionContext = Optional.of(tmp);
+        } else {
+            encryptionContext = Optional.empty();
+        }
+        ByteBuf payload = Unpooled.copiedBuffer(value.getBytes());
+        return new MessageImpl(topic, id,metadata, payload, encryptionContext, null, Schema.STRING);
+    }
+
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        setEncryptionAndCompression(false, false);
+    }
+    @Test
+    public void testToByteBuf() throws IOException {
+        RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2);
+        String topic = "my-topic";
+        container.add(createMessage(topic, "hi-1", 0), null);
+        container.add(createMessage(topic, "hi-2", 1), null);
+        ByteBuf buf = container.toByteBuf();
+
+
+        int idSize = buf.readInt();
+        ByteBuf idBuf = buf.readBytes(idSize);
+        MessageIdData idData = new MessageIdData();
+        idData.parseFrom(idBuf, idSize);
+        Assert.assertEquals(idData.getLedgerId(), 0);
+        Assert.assertEquals(idData.getEntryId(), 1);
+        Assert.assertEquals(idData.getPartition(), -1);
+
+
+        int metadataAndPayloadSize = buf.readInt();
+        ByteBuf metadataAndPayload = buf.readBytes(metadataAndPayloadSize);
+        MessageImpl singleMessageMetadataAndPayload = MessageImpl.deserialize(metadataAndPayload);
+        MessageMetadata metadata = singleMessageMetadataAndPayload.getMessageBuilder();
+        Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
+        Assert.assertEquals(metadata.getHighestSequenceId(), 1);
+        Assert.assertEquals(metadata.getCompression(), NONE);
+
+        SingleMessageMetadata messageMetadata = new SingleMessageMetadata();
+        ByteBuf payload1 = Commands.deSerializeSingleMessageInBatch(
+                singleMessageMetadataAndPayload.getPayload(), messageMetadata, 0, 2);
+        ByteBuf payload2 = Commands.deSerializeSingleMessageInBatch(
+                singleMessageMetadataAndPayload.getPayload(), messageMetadata, 1, 2);
+
+        Assert.assertEquals(payload1.toString(Charset.defaultCharset()), "hi-1");
+        Assert.assertEquals(payload2.toString(Charset.defaultCharset()), "hi-2");
+        payload1.release();
+        payload2.release();
+        singleMessageMetadataAndPayload.release();
+        metadataAndPayload.release();
+        buf.release();
+    }
+
+    @Test
+    public void testToByteBufWithCompressionAndEncryption() throws IOException {
+        setEncryptionAndCompression(true, true);
+
+        RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2);
+        container.setCryptoKeyReader(cryptoKeyReader);
+        String topic = "my-topic";
+        container.add(createMessage(topic, "hi-1", 0), null);
+        container.add(createMessage(topic, "hi-2", 1), null);
+        ByteBuf buf = container.toByteBuf();
+
+        int idSize = buf.readInt();
+        ByteBuf idBuf = buf.readBytes(idSize);
+        MessageIdData idData = new MessageIdData();
+        idData.parseFrom(idBuf, idSize);
+        Assert.assertEquals(idData.getLedgerId(), 0);
+        Assert.assertEquals(idData.getEntryId(), 1);
+        Assert.assertEquals(idData.getPartition(), -1);
+
+        int metadataAndPayloadSize = buf.readInt();
+        ByteBuf metadataAndPayload = buf.readBytes(metadataAndPayloadSize);
+        MessageImpl singleMessageMetadataAndPayload = MessageImpl.deserialize(metadataAndPayload);
+
+        MessageMetadata metadata = singleMessageMetadataAndPayload.getMessageBuilder();
+        Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
+        Assert.assertEquals(metadata.getHighestSequenceId(), 1);
+        Assert.assertEquals(metadata.getCompression(), compressionType);
+
+        ByteBuf payload = singleMessageMetadataAndPayload.getPayload();
+        int maxDecryptedSize = msgCrypto.getMaxOutputSize(payload.readableBytes());
+        ByteBuffer decrypted = ByteBuffer.allocate(maxDecryptedSize);
+        msgCrypto.decrypt(() -> metadata, payload.nioBuffer(), decrypted, cryptoKeyReader);
+        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
+        ByteBuf uncompressed = codec.decode(Unpooled.wrappedBuffer(decrypted),
+                metadata.getUncompressedSize());
+        SingleMessageMetadata messageMetadata = new SingleMessageMetadata();
+
+        ByteBuf payload1 = Commands.deSerializeSingleMessageInBatch(
+                uncompressed, messageMetadata, 0, 2);
+        ByteBuf payload2 = Commands.deSerializeSingleMessageInBatch(
+                uncompressed, messageMetadata, 1, 2);
+
+        Assert.assertEquals(payload1.toString(Charset.defaultCharset()), "hi-1");
+        Assert.assertEquals(payload2.toString(Charset.defaultCharset()), "hi-2");
+        payload1.release();
+        payload2.release();
+        singleMessageMetadataAndPayload.release();
+        metadataAndPayload.release();
+        uncompressed.release();
+        buf.release();
+    }
+
+    @Test
+    public void testToByteBufWithSingleMessage() throws IOException {
+        RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2);
+        String topic = "my-topic";
+        container.add(createMessage(topic, "hi-1", 0), null);
+        ByteBuf buf = container.toByteBuf();
+
+
+        int idSize = buf.readInt();
+        ByteBuf idBuf = buf.readBytes(idSize);
+        MessageIdData idData = new MessageIdData();
+        idData.parseFrom(idBuf, idSize);
+        Assert.assertEquals(idData.getLedgerId(), 0);
+        Assert.assertEquals(idData.getEntryId(), 0);
+        Assert.assertEquals(idData.getPartition(), -1);
+
+
+        int metadataAndPayloadSize = buf.readInt();
+        ByteBuf metadataAndPayload = buf.readBytes(metadataAndPayloadSize);
+        MessageImpl singleMessageMetadataAndPayload = MessageImpl.deserialize(metadataAndPayload);
+        MessageMetadata metadata = singleMessageMetadataAndPayload.getMessageBuilder();
+        Assert.assertEquals(metadata.getNumMessagesInBatch(), 1);
+        Assert.assertEquals(metadata.getHighestSequenceId(), 0);
+        Assert.assertEquals(metadata.getCompression(), NONE);
+
+        Assert.assertEquals(singleMessageMetadataAndPayload.getPayload().toString(Charset.defaultCharset()), "hi-1");
+        singleMessageMetadataAndPayload.release();
+        metadataAndPayload.release();
+        buf.release();
+    }
+
+    @Test
+    public void testMaxNumMessagesInBatch() {
+        RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+        String topic = "my-topic";
+
+        boolean isFull = container.add(createMessage(topic, "hi", 0), null);
+        Assert.assertTrue(isFull);
+        Assert.assertTrue(container.isBatchFull());
+    }
+
+    @Test(expectedExceptions = UnsupportedOperationException.class)
+    public void testCreateOpSendMsg() {
+        RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+        container.createOpSendMsg();
+    }
+
+    @Test
+    public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
+        setEncryptionAndCompression(true, false);
+        RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+        String topic = "my-topic";
+        container.add(createMessage(topic, "hi-1", 0), null);
+        Assert.assertEquals(container.getNumMessagesInBatch(), 1);
+        Throwable e = null;
+        try {
+            container.toByteBuf();
+        } catch (IllegalStateException ex){
+            e = ex;
+        }
+        Assert.assertEquals(e.getClass(), IllegalStateException.class);
+        Assert.assertEquals(container.getNumMessagesInBatch(), 0);
+        Assert.assertEquals(container.batchedMessageMetadataAndPayload, null);
+    }
+
+    @Test
+    public void testToByteBufWithEncryptionWithInvalidEncryptKeys() {
+        setEncryptionAndCompression(true, false);
+        RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+        container.setCryptoKeyReader(cryptoKeyReader);
+        encryptKeys = new HashMap<>();
+        encryptKeys.put(null, null);
+        String topic = "my-topic";
+        container.add(createMessage(topic, "hi-1", 0), null);
+        Assert.assertEquals(container.getNumMessagesInBatch(), 1);
+        Throwable e = null;
+        try {
+            container.toByteBuf();
+        } catch (IllegalArgumentException ex){
+            e = ex;
+        }
+        Assert.assertEquals(e.getClass(), IllegalArgumentException.class);
+        Assert.assertEquals(container.getNumMessagesInBatch(), 0);
+        Assert.assertEquals(container.batchedMessageMetadataAndPayload, null);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 950095871f7..09d49195217 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
@@ -295,4 +296,51 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
 
 
     }
+
+    @Test(timeOut = 30 * 1000)
+    public void testListen() throws Exception {
+        String topic = "persistent://public/default/tableview-listen-test";
+        admin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().key("key:" + i).value("value" + i).send();
+        }
+
+        @Cleanup
+        TableView<String> tv = pulsarClient.newTableViewBuilder(Schema.STRING)
+                .topic(topic)
+                .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+                .create();
+
+        class MockAction implements BiConsumer<String, String> {
+            int acceptedCount = 0;
+            @Override
+            public void accept(String s, String s2) {
+                acceptedCount++;
+            }
+        }
+        MockAction mockAction = new MockAction();
+        tv.listen((k, v) -> mockAction.accept(k, v));
+
+        Awaitility.await()
+                .pollInterval(1, TimeUnit.SECONDS)
+                .atMost(Duration.ofMillis(5000))
+                .until(() -> tv.size() == 5);
+
+        assertEquals(mockAction.acceptedCount, 0);
+
+        for (int i = 5; i < 10; i++) {
+            producer.newMessage().key("key:" + i).value("value" + i).send();
+        }
+
+        Awaitility.await()
+                .pollInterval(1, TimeUnit.SECONDS)
+                .atMost(Duration.ofMillis(5000))
+                .until(() -> tv.size() == 10);
+
+        assertEquals(mockAction.acceptedCount, 5);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index f613dda3bf3..055c595fbfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -54,8 +55,9 @@ import org.testng.annotations.Test;
 @Slf4j
 @Test(groups = "broker")
 public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
-    private ScheduledExecutorService compactionScheduler;
-    private BookKeeper bk;
+    protected ScheduledExecutorService compactionScheduler;
+    protected BookKeeper bk;
+    private TwoPhaseCompactor compactor;
 
     @BeforeMethod
     @Override
@@ -72,18 +74,23 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
         bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
+        compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
     }
 
     @AfterMethod(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
-
+        bk.close();
         if (compactionScheduler != null) {
             compactionScheduler.shutdownNow();
         }
     }
 
+    protected long compact(String topic) throws ExecutionException, InterruptedException {
+        return compactor.compact(topic).get();
+    }
+
     /**
      * Compaction should retain expired keys in the compacted view
      */
@@ -105,8 +112,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
                 .topic(topic)
                 .create();
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).join();
+        compact(topic);
 
         log.info(" ---- X 1: {}", mapper.writeValueAsString(
                 admin.topics().getInternalStats(topic, false)));
@@ -141,7 +147,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
                     .send();
         }
 
-        compactor.compact(topic).join();
+        compact(topic);
 
         validateMessages(pulsarClient, true, topic, round, allKeys);
 
@@ -152,7 +158,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
                     .send();
         }
 
-        compactor.compact(topic).join();
+        compact(topic);
 
         log.info(" ---- X 4: {}", mapper.writeValueAsString(
                 admin.topics().getInternalStats(topic, false)));
@@ -221,8 +227,6 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
                 .topic(topic)
                 .create();
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-
         log.info(" ---- X 1: {}", mapper.writeValueAsString(
                 admin.topics().getInternalStats(topic, false)));
 
@@ -240,7 +244,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
 
         validateMessages(pulsarClient, true, topic, round, allKeys);
 
-        compactor.compact(topic).join();
+        compact(topic);
 
         log.info(" ---- X 3: {}", mapper.writeValueAsString(
                 admin.topics().getInternalStats(topic, false)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 681b4a39c8e..ee5282bf472 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -75,6 +75,7 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -92,8 +93,9 @@ import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
 public class CompactionTest extends MockedPulsarServiceBaseTest {
-    private ScheduledExecutorService compactionScheduler;
-    private BookKeeper bk;
+    protected ScheduledExecutorService compactionScheduler;
+    protected BookKeeper bk;
+    private TwoPhaseCompactor compactor;
 
     @BeforeMethod
     @Override
@@ -108,18 +110,33 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
         bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
+        compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
     }
 
     @AfterMethod(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
-
+        bk.close();
         if (compactionScheduler != null) {
             compactionScheduler.shutdownNow();
         }
     }
 
+    protected long compact(String topic) throws ExecutionException, InterruptedException {
+        return compactor.compact(topic).get();
+    }
+
+
+    protected long compact(String topic, CryptoKeyReader cryptoKeyReader)
+            throws ExecutionException, InterruptedException {
+        return compactor.compact(topic).get();
+    }
+
+    protected TwoPhaseCompactor getCompactor() {
+        return compactor;
+    }
+
     @Test
     public void testCompaction() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
@@ -147,8 +164,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
             all.add(Pair.of(key, data));
         }
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
         // Compacted topic ledger should have same number of entry equals to number of unique key.
@@ -213,9 +229,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
             all.add(Pair.of(key, value));
         }
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
-
+        compact(topic);
 
 
         // consumer with readCompacted enabled only get compacted entries
@@ -278,8 +292,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(m.getData(), "content2".getBytes());
         }
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
@@ -304,8 +317,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         producer.newMessage().key("key0").value("content1".getBytes()).send();
         producer.newMessage().key("key0").value("content2".getBytes()).send();
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         producer.newMessage().key("key0").value("content3".getBytes()).send();
 
@@ -334,8 +346,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         producer.newMessage().key("key0").value("content1".getBytes()).send();
         producer.newMessage().key("key0").value("content2".getBytes()).send();
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
@@ -378,8 +389,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         producer.newMessage().key("key0").value("content1".getBytes()).send();
         producer.newMessage().key("key0").value("content2".getBytes()).send();
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                 .readCompacted(true).subscribe()) {
@@ -417,7 +427,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
 
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
 
-        new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+        compact(topic);
 
         producer.newMessage().key("key0").value("content0".getBytes()).send();
 
@@ -453,8 +463,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // Check that messages after compaction have same ids
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
@@ -512,8 +521,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
                             ((BatchMessageIdImpl)messages.get(2).getMessageId()).getEntryId());
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // Check that messages after compaction have same ids
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
@@ -521,12 +529,23 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
             Message<byte[]> message1 = consumer.receive();
             Assert.assertEquals(message1.getKey(), "key1");
             Assert.assertEquals(new String(message1.getData()), "my-message-1");
-            Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId());
 
             Message<byte[]> message2 = consumer.receive();
             Assert.assertEquals(message2.getKey(), "key2");
             Assert.assertEquals(new String(message2.getData()), "my-message-3");
-            Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId());
+            if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+                MessageIdImpl id = (MessageIdImpl) messages.get(0).getMessageId();
+                MessageIdImpl id1 = new MessageIdImpl(
+                        id.getLedgerId(), id.getEntryId(), id.getPartitionIndex());
+                Assert.assertEquals(message1.getMessageId(), id1);
+                id = (MessageIdImpl) messages.get(2).getMessageId();
+                MessageIdImpl id2 = new MessageIdImpl(
+                        id.getLedgerId(), id.getEntryId(), id.getPartitionIndex());
+                Assert.assertEquals(message2.getMessageId(), id2);
+            } else {
+                Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId());
+                Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId());
+            }
         }
     }
 
@@ -556,8 +575,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -593,34 +611,46 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
-            Message<byte[]> message1 = consumer.receive();
-            Assert.assertFalse(message1.hasKey());
-            Assert.assertEquals(new String(message1.getData()), "my-message-1");
+            if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+                Message<byte[]> message3 = consumer.receive();
+                Assert.assertEquals(message3.getKey(), "key1");
+                Assert.assertEquals(new String(message3.getData()), "my-message-4");
 
-            Message<byte[]> message2 = consumer.receive();
-            Assert.assertFalse(message2.hasKey());
-            Assert.assertEquals(new String(message2.getData()), "my-message-2");
+                Message<byte[]> message4 = consumer.receive();
+                Assert.assertEquals(message4.getKey(), "key2");
+                Assert.assertEquals(new String(message4.getData()), "my-message-6");
 
-            Message<byte[]> message3 = consumer.receive();
-            Assert.assertEquals(message3.getKey(), "key1");
-            Assert.assertEquals(new String(message3.getData()), "my-message-4");
+                Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
+                assertNull(m);
+            } else {
+                Message<byte[]> message1 = consumer.receive();
+                Assert.assertFalse(message1.hasKey());
+                Assert.assertEquals(new String(message1.getData()), "my-message-1");
 
-            Message<byte[]> message4 = consumer.receive();
-            Assert.assertEquals(message4.getKey(), "key2");
-            Assert.assertEquals(new String(message4.getData()), "my-message-6");
+                Message<byte[]> message2 = consumer.receive();
+                Assert.assertFalse(message2.hasKey());
+                Assert.assertEquals(new String(message2.getData()), "my-message-2");
 
-            Message<byte[]> message5 = consumer.receive();
-            Assert.assertFalse(message5.hasKey());
-            Assert.assertEquals(new String(message5.getData()), "my-message-7");
+                Message<byte[]> message3 = consumer.receive();
+                Assert.assertEquals(message3.getKey(), "key1");
+                Assert.assertEquals(new String(message3.getData()), "my-message-4");
 
-            Message<byte[]> message6 = consumer.receive();
-            Assert.assertFalse(message6.hasKey());
-            Assert.assertEquals(new String(message6.getData()), "my-message-8");
+                Message<byte[]> message4 = consumer.receive();
+                Assert.assertEquals(message4.getKey(), "key2");
+                Assert.assertEquals(new String(message4.getData()), "my-message-6");
+
+                Message<byte[]> message5 = consumer.receive();
+                Assert.assertFalse(message5.hasKey());
+                Assert.assertEquals(new String(message5.getData()), "my-message-7");
+
+                Message<byte[]> message6 = consumer.receive();
+                Assert.assertFalse(message6.hasKey());
+                Assert.assertEquals(new String(message6.getData()), "my-message-8");
+            }
         }
     }
 
@@ -693,8 +723,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -773,8 +802,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -837,8 +865,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have been opened
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // should have opened all except last to read
         Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
@@ -866,7 +893,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         ledgersOpened.clear();
 
         // compact the topic again
-        compactor.compact(topic).get();
+        compact(topic);
 
         // shouldn't have opened first ledger (already compacted), penultimate would have some uncompacted data.
         // last ledger already open for writing
@@ -916,8 +943,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -960,8 +986,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -975,7 +1000,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
     }
 
-    class EncKeyReader implements CryptoKeyReader {
+    public static class EncKeyReader implements CryptoKeyReader {
         EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
 
         @Override
@@ -1037,8 +1062,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic, new EncKeyReader());
 
         // Check that messages after compaction have same ids
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
@@ -1083,11 +1107,8 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic, new EncKeyReader());
 
-        // with encryption, all messages are passed through compaction as it doesn't
-        // have the keys to decrypt the batch payload
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
                 .readCompacted(true).subscribe()){
@@ -1095,13 +1116,21 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(message1.getKey(), "key1");
             Assert.assertEquals(new String(message1.getData()), "my-message-1");
 
-            Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key2");
-            Assert.assertEquals(new String(message2.getData()), "my-message-2");
-
-            Message<byte[]> message3 = consumer.receive();
-            Assert.assertEquals(message3.getKey(), "key2");
-            Assert.assertEquals(new String(message3.getData()), "my-message-3");
+            if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+                Message<byte[]> message3 = consumer.receive();
+                Assert.assertEquals(message3.getKey(), "key2");
+                Assert.assertEquals(new String(message3.getData()), "my-message-3");
+            } else {
+                // with encryption, all messages are passed through compaction as it doesn't
+                // have the keys to decrypt the batch payload
+                Message<byte[]> message2 = consumer.receive();
+                Assert.assertEquals(message2.getKey(), "key2");
+                Assert.assertEquals(new String(message2.getData()), "my-message-2");
+
+                Message<byte[]> message3 = consumer.receive();
+                Assert.assertEquals(message3.getKey(), "key2");
+                Assert.assertEquals(new String(message3.getData()), "my-message-3");
+            }
         }
     }
 
@@ -1132,8 +1161,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic, new EncKeyReader());
 
         // Check that messages after compaction have same ids
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
@@ -1179,8 +1207,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic, new EncKeyReader());
 
         // with encryption, all messages are passed through compaction as it doesn't
         // have the keys to decrypt the batch payload
@@ -1191,13 +1218,20 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(message1.getKey(), "key1");
             Assert.assertEquals(new String(message1.getData()), "my-message-1");
 
-            Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key2");
-            Assert.assertEquals(new String(message2.getData()), "my-message-2");
 
-            Message<byte[]> message3 = consumer.receive();
-            Assert.assertEquals(message3.getKey(), "key2");
-            Assert.assertEquals(new String(message3.getData()), "my-message-3");
+            if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+                Message<byte[]> message3 = consumer.receive();
+                Assert.assertEquals(message3.getKey(), "key2");
+                Assert.assertEquals(new String(message3.getData()), "my-message-3");
+            } else {
+                Message<byte[]> message2 = consumer.receive();
+                Assert.assertEquals(message2.getKey(), "key2");
+                Assert.assertEquals(new String(message2.getData()), "my-message-2");
+
+                Message<byte[]> message3 = consumer.receive();
+                Assert.assertEquals(message3.getKey(), "key2");
+                Assert.assertEquals(new String(message3.getData()), "my-message-3");
+            }
         }
     }
 
@@ -1254,8 +1288,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         }
 
         // compact the topic
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic, new EncKeyReader());
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                 .cryptoKeyReader(new EncKeyReader())
@@ -1264,22 +1297,32 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(message1.getKey(), "key0");
             Assert.assertEquals(new String(message1.getData()), "my-message-0");
 
-            // see all messages from batch
-            Message<byte[]> message2 = consumer.receive();
-            Assert.assertEquals(message2.getKey(), "key2");
-            Assert.assertEquals(new String(message2.getData()), "my-message-2");
+            if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+                Message<byte[]> message3 = consumer.receive();
+                Assert.assertEquals(message3.getKey(), "key3");
+                Assert.assertEquals(new String(message3.getData()), "my-message-3");
 
-            Message<byte[]> message3 = consumer.receive();
-            Assert.assertEquals(message3.getKey(), "key3");
-            Assert.assertEquals(new String(message3.getData()), "my-message-3");
-
-            Message<byte[]> message4 = consumer.receive();
-            Assert.assertEquals(message4.getKey(), "key2");
-            Assert.assertEquals(new String(message4.getData()), "");
-
-            Message<byte[]> message5 = consumer.receive();
-            Assert.assertEquals(message5.getKey(), "key4");
-            Assert.assertEquals(new String(message5.getData()), "my-message-4");
+                Message<byte[]> message5 = consumer.receive();
+                Assert.assertEquals(message5.getKey(), "key4");
+                Assert.assertEquals(new String(message5.getData()), "my-message-4");
+            } else {
+                // see all messages from batch
+                Message<byte[]> message2 = consumer.receive();
+                Assert.assertEquals(message2.getKey(), "key2");
+                Assert.assertEquals(new String(message2.getData()), "my-message-2");
+
+                Message<byte[]> message3 = consumer.receive();
+                Assert.assertEquals(message3.getKey(), "key3");
+                Assert.assertEquals(new String(message3.getData()), "my-message-3");
+
+                Message<byte[]> message4 = consumer.receive();
+                Assert.assertEquals(message4.getKey(), "key2");
+                Assert.assertEquals(new String(message4.getData()), "");
+
+                Message<byte[]> message5 = consumer.receive();
+                Assert.assertEquals(message5.getKey(), "key4");
+                Assert.assertEquals(new String(message5.getData()), "my-message-4");
+            }
         }
     }
 
@@ -1303,8 +1346,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         producer.newMessage().key("1").value("".getBytes()).send();
         producer.newMessage().key("2").value("".getBytes()).send();
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         Set<String> expected = Sets.newHashSet("3");
         // consumer with readCompacted enabled only get compacted entries
@@ -1329,8 +1371,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         producer.newMessage().key("1").value("".getBytes()).send();
         producer.newMessage().key("2").value("".getBytes()).send();
 
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // consumer with readCompacted enabled only get compacted entries
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1364,8 +1405,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         FutureUtil.waitForAll(futures).get();
 
         // 2.compact the topic.
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // consumer with readCompacted enabled only get compacted entries
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1409,8 +1449,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         FutureUtil.waitForAll(futures).get();
 
         // 2.compact the topic.
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // consumer with readCompacted enabled only get compacted entries
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1465,8 +1504,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         FutureUtil.waitForAll(futures).get();
 
         // 2.compact the topic.
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // consumer with readCompacted enabled only get compacted entries
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1519,8 +1557,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         FutureUtil.waitForAll(futures).get();
 
         // 2.compact the topic.
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // consumer with readCompacted enabled only get compacted entries
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1559,8 +1596,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         FutureUtil.waitForAll(futures).get();
 
         // 2.compact the topic.
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // 3. Send more ten messages
         futures.clear();
@@ -1608,8 +1644,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         FutureUtil.waitForAll(futures).get();
 
         // 2.compact the topic.
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
 
         // 3. Send more ten messages
         futures.clear();
@@ -1638,7 +1673,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         producer.newMessage().key(key).value(("").getBytes()).send();
 
         // 5.compact the topic.
-        compactor.compact(topic).get();
+        compact(topic);
 
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 8099cd51668..e86be6a4db8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -58,7 +59,12 @@ import org.testng.annotations.Test;
 @Test(groups = "broker-compaction")
 public class CompactorTest extends MockedPulsarServiceBaseTest {
 
-    private ScheduledExecutorService compactionScheduler;
+    protected ScheduledExecutorService compactionScheduler;
+
+    protected BookKeeper bk;
+    protected Compactor compactor;
+
+
 
     @BeforeMethod
     @Override
@@ -73,20 +79,31 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
 
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
+        bk = pulsar.getBookKeeperClientFactory().create(
+                this.conf, null, null, Optional.empty(), null);
+        compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
     }
 
+
     @AfterMethod(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
+        bk.close();
         compactionScheduler.shutdownNow();
     }
 
+    protected long compact(String topic) throws ExecutionException, InterruptedException {
+        return compactor.compact(topic).get();
+    }
+
+    protected Compactor getCompactor() {
+        return compactor;
+    }
+
     private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception {
-        BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null, null, Optional.empty(), null);
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        long compactedLedgerId = compactor.compact(topic).get();
+
+        long compactedLedgerId = compact(topic);
 
         LedgerHandle ledger = bk.openLedger(compactedLedgerId,
                                             Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
@@ -111,7 +128,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
             m.close();
         }
         if (checkMetrics) {
-            CompactionRecord compactionRecord = compactor.getStats().getCompactionRecordForTopic(topic).get();
+            CompactionRecord compactionRecord = getCompactor().getStats().getCompactionRecordForTopic(topic).get();
             long compactedTopicRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
             long lastCompactSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
             long lastCompactFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
@@ -227,10 +244,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         // trigger creation of topic on server side
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
 
-        BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
-                this.conf, null, null, Optional.empty(), null);
-        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-        compactor.compact(topic).get();
+        compact(topic);
     }
 
     @Test
@@ -240,7 +254,6 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class),
                 Mockito.mock(BookKeeper.class), compactionScheduler);
         Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
-
     }
 
     public ByteBuf extractPayload(RawMessage m) throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/NumericOrderCompactionStrategy.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/NumericOrderCompactionStrategy.java
new file mode 100644
index 00000000000..64e761c34cc
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/NumericOrderCompactionStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+
+public class NumericOrderCompactionStrategy implements TopicCompactionStrategy<Integer> {
+
+        @Override
+        public Schema<Integer> getSchema() {
+            return Schema.INT32;
+        }
+
+        @Override
+        public boolean shouldKeepLeft(Integer prev, Integer cur) {
+            if (prev == null || cur == null) {
+                return false;
+            }
+            return prev >= cur;
+        }
+    }
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
new file mode 100644
index 00000000000..1cac04c2fa9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class StrategicCompactionRetentionTest extends CompactionRetentionTest {
+    private TopicCompactionStrategy strategy;
+    private StrategicTwoPhaseCompactor compactor;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
+        strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
+    }
+
+    @Override
+    protected long compact(String topic) throws ExecutionException, InterruptedException {
+        return (long) compactor.compact(topic, strategy).get();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
new file mode 100644
index 00000000000..135a839bd54
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "flaky")
+public class StrategicCompactionTest extends CompactionTest {
+    private TopicCompactionStrategy strategy;
+    private StrategicTwoPhaseCompactor compactor;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
+        strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
+    }
+
+    @Override
+    protected long compact(String topic) throws ExecutionException, InterruptedException {
+        return (long) compactor.compact(topic, strategy).get();
+    }
+
+    @Override
+    protected long compact(String topic, CryptoKeyReader cryptoKeyReader)
+            throws ExecutionException, InterruptedException {
+        return (long) compactor.compact(topic, strategy, cryptoKeyReader).get();
+    }
+
+    @Override
+    protected TwoPhaseCompactor getCompactor() {
+        return compactor;
+    }
+
+
+    @Test
+    public void testNumericOrderCompaction() throws Exception {
+
+        strategy = new NumericOrderCompactionStrategy();
+
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        final int numMessages = 50;
+        final int maxKeys = 5;
+
+        Producer<Integer> producer = pulsarClient.newProducer(strategy.getSchema())
+                .topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        Map<String, Integer> expected = new HashMap<>();
+        List<Pair<String, Integer>> all = new ArrayList<>();
+        Random r = new Random(0);
+
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+        for (int j = 0; j < numMessages; j++) {
+            int keyIndex = r.nextInt(maxKeys);
+            String key = "key" + keyIndex;
+            int seed = r.nextInt(j + 1);
+            Integer cur = seed < j / 5 ? null : seed;
+            producer.newMessage().key(key).value(cur).send();
+            Integer prev = expected.get(key);
+            if (!strategy.shouldKeepLeft(prev, cur)) {
+                if (cur == null) {
+                    expected.remove(key);
+                } else {
+                    expected.put(key, cur);
+                }
+            }
+            all.add(Pair.of(key, cur));
+        }
+
+        compact(topic);
+
+        PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
+        // Compacted topic ledger should have same number of entry equals to number of unique key.
+        Assert.assertEquals(expected.size(), internalStats.compactedLedger.entries);
+        Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
+        Assert.assertFalse(internalStats.compactedLedger.offloaded);
+
+        Map<String, Integer> expectedCopy = new HashMap<>(expected);
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<Integer> consumer = pulsarClient.newConsumer(strategy.getSchema()).topic(topic).subscriptionName("sub1")
+                .readCompacted(true).subscribe()) {
+            while (!expected.isEmpty()) {
+                Message<Integer> m = consumer.receive(2, TimeUnit.SECONDS);
+                Assert.assertEquals(m.getValue(), expected.remove(m.getKey()), m.getKey());
+            }
+            Assert.assertTrue(expected.isEmpty());
+        }
+
+        // can get full backlog if read compacted disabled
+        try (Consumer<Integer> consumer = pulsarClient.newConsumer(strategy.getSchema()).topic(topic).subscriptionName("sub1")
+                .readCompacted(false).subscribe()) {
+            while (true) {
+                Message<Integer> m = consumer.receive(2, TimeUnit.SECONDS);
+                Pair<String, Integer> expectedMessage = all.remove(0);
+                Assert.assertEquals(m.getKey(), expectedMessage.getLeft());
+                Assert.assertEquals(m.getValue(), expectedMessage.getRight());
+                if (all.isEmpty()) {
+                    break;
+                }
+            }
+            Assert.assertTrue(all.isEmpty());
+        }
+
+        TableView<Integer> tableView = pulsar.getClient().newTableViewBuilder(strategy.getSchema())
+                .topic(topic)
+                .loadConf(Map.of(
+                        "topicCompactionStrategyClassName", strategy.getClass().getCanonicalName()))
+                .create();
+        Assert.assertEquals(tableView.entrySet(), expectedCopy.entrySet());
+    }
+
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
new file mode 100644
index 00000000000..91dd8a2bd35
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class StrategicCompactorTest extends CompactorTest {
+    private TopicCompactionStrategy strategy;
+
+    private StrategicTwoPhaseCompactor compactor;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
+        strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
+    }
+
+    @Override
+    protected long compact(String topic) throws ExecutionException, InterruptedException {
+        return (long) compactor.compact(topic, strategy).get();
+    }
+
+    @Override
+    protected Compactor getCompactor() {
+        return compactor;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
new file mode 100644
index 00000000000..0ecd09606ce
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class TopicCompactionStrategyTest {
+    public static class DummyTopicCompactionStrategy implements TopicCompactionStrategy<byte[]> {
+
+        @Override
+        public Schema getSchema() {
+            return Schema.BYTES;
+        }
+
+        @Override
+        public boolean shouldKeepLeft(byte[] prev, byte[] cur) {
+            return false;
+        }
+    }
+
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testLoadInvalidTopicCompactionStrategy() {
+        TopicCompactionStrategy.load("uknown");
+    }
+
+    @Test
+    public void testNumericOrderCompactionStrategy() {
+        TopicCompactionStrategy<Integer> strategy =
+                TopicCompactionStrategy.load(NumericOrderCompactionStrategy.class.getCanonicalName());
+        Assert.assertFalse(strategy.shouldKeepLeft(1, 2));
+        Assert.assertTrue(strategy.shouldKeepLeft(2, 1));
+    }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
index 136d8f53823..9e5008c8bd0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
@@ -89,7 +89,15 @@ public interface TableView<T> extends Closeable {
     void forEach(BiConsumer<String, T> action);
 
     /**
-     * Performs the give action for each entry in this map until all entries
+     * Performs the given action for each future entry in this map until all entries
+     * have been processed or the action throws an exception.
+     *
+     * @param action The action to be performed for each entry
+     */
+    void listen(BiConsumer<String, T> action);
+
+    /**
+     * Performs the given action for each entry in this map until all entries
      * have been processed or the action throws an exception.
      *
      * @param action The action to be performed for each entry
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index c601f51a725..fdbf1f15c29 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -49,21 +49,21 @@ import org.slf4j.LoggerFactory;
  */
 class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
 
-    private MessageMetadata messageMetadata = new MessageMetadata();
+    protected MessageMetadata messageMetadata = new MessageMetadata();
     // sequence id for this batch which will be persisted as a single entry by broker
     @Getter
     @Setter
-    private long lowestSequenceId = -1L;
+    protected long lowestSequenceId = -1L;
     @Getter
     @Setter
-    private long highestSequenceId = -1L;
-    private ByteBuf batchedMessageMetadataAndPayload;
-    private List<MessageImpl<?>> messages = new ArrayList<>(maxMessagesNum);
+    protected long highestSequenceId = -1L;
+    protected ByteBuf batchedMessageMetadataAndPayload;
+    protected List<MessageImpl<?>> messages = new ArrayList<>(maxMessagesNum);
     protected SendCallback previousCallback = null;
     // keep track of callbacks for individual messages being published in a batch
     protected SendCallback firstCallback;
 
-    private final ByteBufAllocator allocator;
+    protected final ByteBufAllocator allocator;
     private static final int SHRINK_COOLING_OFF_PERIOD = 10;
     private int consecutiveShrinkTime = 0;
 
@@ -111,9 +111,11 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
                 }
             } catch (Throwable e) {
                 log.error("construct first message failed, exception is ", e);
-                producer.semaphoreRelease(getNumMessagesInBatch());
-                producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize()
-                        + batchAllocatedSizeBytes);
+                if (producer != null) {
+                    producer.semaphoreRelease(getNumMessagesInBatch());
+                    producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize()
+                            + batchAllocatedSizeBytes);
+                }
                 discard(new PulsarClientException(e));
                 return false;
             }
@@ -131,11 +133,14 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
             messageMetadata.setSequenceId(lowestSequenceId);
         }
         highestSequenceId = msg.getSequenceId();
-        ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
+        if (producer != null) {
+            ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
+        }
+
         return isBatchFull();
     }
 
-    private ByteBuf getCompressedBatchMetadataAndPayload() {
+    protected ByteBuf getCompressedBatchMetadataAndPayload() {
         int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex();
         int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex();
 
@@ -308,11 +313,13 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
         return op;
     }
 
-    private void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
+    protected void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
         int delta = updatedSizeBytes - batchAllocatedSizeBytes;
         batchAllocatedSizeBytes = updatedSizeBytes;
         if (delta != 0) {
-            producer.client.getMemoryLimitController().forceReserveMemory(delta);
+            if (producer != null) {
+                producer.client.getMemoryLimitController().forceReserveMemory(delta);
+            }
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 238beffc79f..83931c40394 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -36,7 +36,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
@@ -69,7 +68,8 @@ public class ReaderImpl<T> implements Reader<T> {
         consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
         consumerConfiguration.setSubscriptionName(subscription);
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
-        consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
+        consumerConfiguration.setSubscriptionMode(readerConfiguration.getSubscriptionMode());
+        consumerConfiguration.setSubscriptionInitialPosition(readerConfiguration.getSubscriptionInitialPosition());
         consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
         consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
         consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
index 31df24b60f8..a7dc6f94a49 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
@@ -30,6 +30,7 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
     private String topicName = null;
     private String subscriptionName = null;
     private long autoUpdatePartitionsSeconds = 60;
+    private String topicCompactionStrategyClassName = null;
 
     @Override
     public TableViewConfigurationData clone() {
@@ -38,6 +39,7 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
             clone.setTopicName(topicName);
             clone.setAutoUpdatePartitionsSeconds(autoUpdatePartitionsSeconds);
             clone.setSubscriptionName(subscriptionName);
+            clone.setTopicCompactionStrategyClassName(topicCompactionStrategyClassName);
             return clone;
         } catch (CloneNotSupportedException e) {
             throw new AssertionError();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 9e7257f23fb..64f2e642991 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
 
 @Slf4j
 public class TableViewImpl<T> implements TableView<T> {
@@ -54,6 +55,7 @@ public class TableViewImpl<T> implements TableView<T> {
     private final List<BiConsumer<String, T>> listeners;
     private final ReentrantLock listenersMutex;
     private final boolean isPersistentTopic;
+    private TopicCompactionStrategy<T> compactionStrategy;
 
     TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
         this.conf = conf;
@@ -62,6 +64,7 @@ public class TableViewImpl<T> implements TableView<T> {
         this.immutableData = Collections.unmodifiableMap(data);
         this.listeners = new ArrayList<>();
         this.listenersMutex = new ReentrantLock();
+        this.compactionStrategy = TopicCompactionStrategy.load(conf.getTopicCompactionStrategyClassName());
         ReaderBuilder<T> readerBuilder = client.newReader(schema)
                 .topic(conf.getTopicName())
                 .startMessageId(MessageId.earliest)
@@ -125,6 +128,16 @@ public class TableViewImpl<T> implements TableView<T> {
         data.forEach(action);
     }
 
+    @Override
+    public void listen(BiConsumer<String, T> action) {
+        try {
+            listenersMutex.lock();
+            listeners.add(action);
+        } finally {
+            listenersMutex.unlock();
+        }
+    }
+
     @Override
     public void forEachAndListen(BiConsumer<String, T> action) {
         // Ensure we iterate over all the existing entry _and_ start the listening from the exact next message
@@ -157,30 +170,40 @@ public class TableViewImpl<T> implements TableView<T> {
     private void handleMessage(Message<T> msg) {
         try {
             if (msg.hasKey()) {
+                String key = msg.getKey();
+                T cur = msg.size() > 0 ? msg.getValue() : null;
                 if (log.isDebugEnabled()) {
                     log.debug("Applying message from topic {}. key={} value={}",
                             conf.getTopicName(),
-                            msg.getKey(),
-                            msg.getValue());
+                            key,
+                            cur);
                 }
 
-                try {
-                    listenersMutex.lock();
-                    if (null == msg.getValue()){
-                        data.remove(msg.getKey());
-                    } else {
-                        data.put(msg.getKey(), msg.getValue());
-                    }
+                T prev = data.get(key);
+                boolean update = true;
+                if (compactionStrategy != null) {
+                    update = !compactionStrategy.shouldKeepLeft(prev, cur);
+                }
+
+                if (update) {
+                    try {
+                        listenersMutex.lock();
+                        if (null == cur) {
+                            data.remove(key);
+                        } else {
+                            data.put(key, cur);
+                        }
 
-                    for (BiConsumer<String, T> listener : listeners) {
-                        try {
-                            listener.accept(msg.getKey(), msg.getValue());
-                        } catch (Throwable t) {
-                            log.error("Table view listener raised an exception", t);
+                        for (BiConsumer<String, T> listener : listeners) {
+                            try {
+                                listener.accept(key, cur);
+                            } catch (Throwable t) {
+                                log.error("Table view listener raised an exception", t);
+                            }
                         }
+                    } finally {
+                        listenersMutex.unlock();
                     }
-                } finally {
-                    listenersMutex.unlock();
                 }
             }
         } finally {
@@ -208,6 +231,9 @@ public class TableViewImpl<T> implements TableView<T> {
                                   handleMessage(msg);
                                   readAllExistingMessages(reader, future, startTime, messagesRead);
                                }).exceptionally(ex -> {
+                                   logException(
+                                           String.format("Reader %s was interrupted while reading existing messages",
+                                                   reader.getTopic()), ex);
                                    future.completeExceptionally(ex);
                                    return null;
                                });
@@ -231,8 +257,18 @@ public class TableViewImpl<T> implements TableView<T> {
                     handleMessage(msg);
                     readTailMessages(reader);
                 }).exceptionally(ex -> {
-                    log.info("Reader {} was interrupted", reader.getTopic());
+                    logException(
+                            String.format("Reader %s was interrupted while reading tail messages.",
+                                    reader.getTopic()), ex);
                     return null;
                 });
     }
+
+    private void logException(String msg, Throwable ex) {
+        if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
+            log.warn(msg, ex);
+        } else {
+            log.error(msg, ex);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 42e115ea872..86707d2aa2f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -32,6 +32,8 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.ReaderInterceptor;
 import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
 
 @Data
 public class ReaderConfigurationData<T> implements Serializable, Cloneable {
@@ -153,6 +155,10 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
 
     private long expireTimeOfIncompleteChunkedMessageMillis = TimeUnit.MINUTES.toMillis(1);
 
+    private SubscriptionMode subscriptionMode = SubscriptionMode.NonDurable;
+
+    private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
     @JsonIgnore
     public String getTopicName() {
         if (topicNames.size() > 1) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
new file mode 100644
index 00000000000..f06374b234f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * Defines a custom strategy to compact messages in a topic.
+ * This strategy can be passed to Topic Compactor and Table View to compact messages in a custom way.
+ *
+ * Examples:
+ *
+ * TopicCompactionStrategy strategy = new MyTopicCompactionStrategy();
+ *
+ * // Run topic compaction by the compaction strategy.
+ * // While compacting messages for each key,
+ * //   it will choose messages only if TopicCompactionStrategy.shouldKeepLeft(prev, cur) returns false.
+ * StrategicTwoPhaseCompactor compactor = new StrategicTwoPhaseCompactor(...);
+ * compactor.compact(topic, strategy);
+ *
+ * // Run table view by the compaction strategy.
+ * // While updating messages in the table view <key,value> map,
+ * //   it will choose messages only if TopicCompactionStrategy.shouldKeepLeft(prev, cur) returns false.
+ * TableView tableView = pulsar.getClient().newTableViewBuilder(strategy.getSchema())
+ *                 .topic(topic)
+ *                 .loadConf(Map.of(
+ *                         "topicCompactionStrategyClassName", strategy.getClass().getCanonicalName()))
+ *                 .create();
+ */
+public interface TopicCompactionStrategy<T> {
+
+    /**
+     * Returns the schema object for this strategy.
+     * @return
+     */
+    Schema<T> getSchema();
+    /**
+     * Tests if the compaction needs to keep the left(previous message)
+     * compared to the right(current message) for the same key.
+     *
+     * @param prev previous message value
+     * @param cur current message value
+     * @return True if it needs to keep the previous message and ignore the current message. Otherwise, False.
+     */
+    boolean shouldKeepLeft(T prev, T cur);
+
+    static TopicCompactionStrategy load(String topicCompactionStrategyClassName) {
+        if (topicCompactionStrategyClassName == null) {
+            return null;
+        }
+        try {
+            Class<?> clazz = Class.forName(topicCompactionStrategyClassName);
+            Object instance = clazz.getDeclaredConstructor().newInstance();
+            return (TopicCompactionStrategy) instance;
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    "Error when loading topic compaction strategy: " + topicCompactionStrategyClassName, e);
+        }
+    }
+}