You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/24 09:54:14 UTC

[pulsar] branch branch-2.8 updated (5d52994 -> b3e7be9)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 5d52994  Bugfix: Fix rackaware placement policy init error (#12097)
     new 5b993ba  Fix messages in TopicPolicies will never be cleaned up (#11928)
     new 1b4b71b  [pulsar-perf] Make it possible to disable poolMessages (#12090)
     new 9ccfe96  [Client] Fix ConcurrentModificationException in sendAsync (#11884)
     new bff59fd  use correct line separator instead of \n (#12143)
     new d5d40a2  [pulsar-functions-go] support set subscription position (#11990)
     new b3e7be9  Return the last position of the compacted data while the original data been deleted. (#12161)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/service/ServerCnx.java    | 33 +++++++--
 .../SystemTopicBasedTopicPoliciesService.java      | 50 +++++++++----
 .../systopic/TopicPoliciesSystemTopicClient.java   | 19 +++++
 .../apache/pulsar/compaction/CompactedTopic.java   |  2 +
 .../pulsar/compaction/CompactedTopicImpl.java      | 12 +++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 79 ++++++++++++++++++++
 .../pulsar/client/impl/PulsarTestClient.java       | 11 ++-
 .../pulsar/compaction/CompactedTopicTest.java      | 56 ++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 78 ++++++++++++++++++--
 .../pulsar/client/impl/OpSendMsgQueueTest.java     | 85 ++++++++++++++++++++++
 .../apache/pulsar/common/util/FutureUtilTest.java  |  4 +-
 pulsar-function-go/conf/conf.go                    |  9 ++-
 pulsar-function-go/conf/conf.yaml                  |  1 +
 pulsar-function-go/pf/instanceConf.go              |  7 +-
 pulsar-function-go/pf/instanceConf_test.go         |  7 +-
 .../functions/instance/go/GoInstanceConfig.java    |  2 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |  2 +
 .../pulsar/testclient/PerformanceConsumer.java     |  4 +-
 18 files changed, 415 insertions(+), 46 deletions(-)
 create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java

[pulsar] 06/06: Return the last position of the compacted data while the original data been deleted. (#12161)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b3e7be9447a24b46f765bb1d0b2debe9a7349d1a
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Sep 24 16:40:55 2021 +0800

    Return the last position of the compacted data while the original data been deleted. (#12161)
    
    Currently, for the get last message ID request the broker returns -1:-1 if all the original data been deleted.
    
    ```
    09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:44443] Created subscription on topic xxx
    09:51:12.156 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Reset cursor:ManagedCursorImpl{ledger=xxx, name=__compaction, ackPos=44946:0, readPos=44946:1} to 66425:-1 since ledger consumed completely
    09:51:12.156 [BookKeeperClientWorker-OrderedExecutor-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Ledger 44946 contains the current last confirmed entry 44946:0, and it is going to be deleted
    09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] End TrimConsumedLedgers. ledgers=1 totalSize=0
    09:51:12.159 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Removing ledger 44946 - size: 3999
    ```
    
    After the rollover task, the topic internal stats will be:
    
    ```
    {
        "entriesAddedCounter": 0,
        "numberOfEntries": 0,
        "totalSize": 0,
        "currentLedgerEntries": 0,
        "currentLedgerSize": 0,
        "lastLedgerCreatedTimestamp": "2021-09-20T09:51:12.15Z",
        "waitingCursorsCount": 29,
        "pendingAddEntriesCount": 0,
        "lastConfirmedEntry": "44946:0",
        "state": "LedgerOpened",
        "ledgers": [
            {
                "ledgerId": 66425,
                "entries": 0,
                "size": 0,
                "offloaded": false,
                "underReplicated": false
            }
        ],
        "cursors": {
            "__compaction": {
                "markDeletePosition": "44946:0",
                "readPosition": "44946:1",
                "waitingReadOp": false,
                "pendingReadOps": 0,
                "messagesConsumedCounter": 0,
                "cursorLedger": -1,
                "cursorLedgerLastEntry": -1,
                "individuallyDeletedMessages": "[]",
                "lastLedgerSwitchTimestamp": "2021-09-20T09:51:12.154Z",
                "state": "NoLedger",
                "numberOfEntriesSinceFirstNotAckedMessage": 1,
                "totalNonContiguousDeletedMessagesRange": 0,
                "subscriptionHavePendingRead": false,
                "subscriptionHavePendingReplayRead": false,
                "properties": {
                    "CompactedTopicLedger": 64365
                }
            }
        },
        "schemaLedgers": [],
        "compactedLedger": {
            "ledgerId": 64365,
            "entries": 1,
            "size": 4024,
            "offloaded": false,
            "underReplicated": false
        }
    }
    ```
    
    At this time, when a reader call hasMessageAvailable(), the client will get the last message id from the broker, the NonRecoverableLedgerException will throw at the broker side due the ledger 44946 has been deleted.
    
    ```
    12:41:40.937 [pulsar-io-4-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.16.124.36:53488] Created subscription on topic xxx / yyy
    12:41:41.131 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx] Error opening ledger for reading at position 44946:0 - org.apache.bookkeeper.mledger.ManagedLedgerException$NonRecoverableLedgerException: No such ledger exists on Metadata Server
    ```
    
    The problem is we are not checking if there is compacted data for the topic. If the topic has compacted data but encounter the above situation, we should return the last message ID of the compacted Ledger to the client.
    
    Added the test for the new changes.
    
    (cherry picked from commit 86e720fc90b8ca42486a3ce848235761b524e723)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 33 ++++++++++---
 .../apache/pulsar/compaction/CompactedTopic.java   |  2 +
 .../pulsar/compaction/CompactedTopicImpl.java      | 12 +++++
 .../pulsar/compaction/CompactedTopicTest.java      | 56 ++++++++++++++++++++++
 4 files changed, 97 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 340685b..a6d7f9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1719,12 +1719,33 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         batchSizeFuture.whenComplete((batchSize, e) -> {
             if (e != null) {
                 if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
-                    // in this case, the ledgers been removed except the current ledger
-                    // and current ledger without any data
-                    ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-                            -1, -1, partitionIndex, -1,
-                            markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
-                            markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+                    persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> {
+                        if (entry != null) {
+                            // in this case, all the data has been compacted, so return the last position
+                            // in the compacted ledger to the client
+                            MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
+                            int bs = metadata.getNumMessagesInBatch();
+                            int largestBatchIndex = bs > 0 ? bs - 1 : -1;
+                            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                                    entry.getLedgerId(), entry.getLedgerId(), partitionIndex, largestBatchIndex,
+                                    markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+                                    markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+                            entry.release();
+                        } else {
+                            // in this case, the ledgers been removed except the current ledger
+                            // and current ledger without any data
+                            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                                    -1, -1, partitionIndex, -1,
+                                    markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+                                    markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+                        }
+                    }).exceptionally(ex -> {
+                        ctx.writeAndFlush(Commands.newError(
+                                requestId, ServerError.MetadataError,
+                                "Failed to read last entry of the compacted Ledger "
+                                        + ex.getCause().getMessage()));
+                        return null;
+                    });
                 } else {
                     ctx.writeAndFlush(Commands.newError(
                             requestId, ServerError.MetadataError,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 4922852..7c96937 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.compaction;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.Consumer;
@@ -31,4 +32,5 @@ public interface CompactedTopic {
                                 boolean isFirstRead,
                                 ReadEntriesCallback callback,
                                 Consumer consumer);
+    CompletableFuture<Entry> readLastEntryOfCompactedLedger();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index ee9fbbc..1313413 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -281,6 +281,18 @@ public class CompactedTopicImpl implements CompactedTopic {
         return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get());
     }
 
+    @Override
+    public CompletableFuture<Entry> readLastEntryOfCompactedLedger() {
+        if (compactionHorizon == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return compactedTopicContext.thenCompose(context ->
+                readEntries(context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed())
+                        .thenCompose(entries -> entries.size() > 0
+                                ? CompletableFuture.completedFuture(entries.get(0))
+                                : CompletableFuture.completedFuture(null)));
+    }
+
     private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
         return ComparisonChain.start()
             .compare(p.getLedgerId(), m.getLedgerId())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 410d1e5..b70f495 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -40,10 +40,13 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
@@ -56,8 +59,10 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -360,4 +365,55 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(compactedMsgCount, 1);
         Assert.assertEquals(nonCompactedMsgCount, numMessages);
     }
+
+    @Test
+    public void testLastMessageIdForCompactedLedger() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/testLastMessageIdForCompactedLedger-" + UUID.randomUUID();
+        final String key = "1";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+        final int numMessages = 10;
+        final String msg = "test compaction msg";
+        for (int i = 0; i < numMessages; ++i) {
+            producer.newMessage().key(key).value(msg).send();
+        }
+        admin.topics().triggerCompaction(topic);
+        boolean succeed = retryStrategically((test) -> {
+            try {
+                return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 200);
+
+        Assert.assertTrue(succeed);
+
+        PersistentTopicInternalStats stats0 = admin.topics().getInternalStats(topic);
+        admin.topics().unload(topic);
+        PersistentTopicInternalStats stats1 = admin.topics().getInternalStats(topic);
+        // Make sure the ledger rollover has triggered.
+        Assert.assertTrue(stats0.currentLedgerSize != stats1.currentLedgerSize);
+
+        Optional<Topic> topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get();
+        Assert.assertTrue(topicRef.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topicRef.get();
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
+        managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(managedLedger.getCurrentLedgerEntries(), 0);
+            Assert.assertTrue(managedLedger.getLastConfirmedEntry().getEntryId() != -1);
+            Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+        });
+
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test")
+                .readCompacted(true)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        Assert.assertTrue(reader.hasMessageAvailable());
+        Assert.assertEquals(msg, reader.readNext().getValue());
+        Assert.assertFalse(reader.hasMessageAvailable());
+    }
 }

[pulsar] 03/06: [Client] Fix ConcurrentModificationException in sendAsync (#11884)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9ccfe96bae46fa4cc17130c0a2038b3b57c0cb59
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Sep 21 12:39:04 2021 +0300

    [Client] Fix ConcurrentModificationException in sendAsync (#11884)
    
    
    (cherry picked from commit a1c10288f2fb011443b6edb98def1841a310157d)
---
 .../pulsar/client/impl/PulsarTestClient.java       | 11 ++-
 .../apache/pulsar/client/impl/ProducerImpl.java    | 78 ++++++++++++++++++--
 .../pulsar/client/impl/OpSendMsgQueueTest.java     | 85 ++++++++++++++++++++++
 3 files changed, 161 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index 8fede95..eebcf5b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -22,8 +22,6 @@ import static org.testng.Assert.assertEquals;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -139,14 +137,15 @@ public class PulsarTestClient extends PulsarClientImpl {
         return new ProducerImpl<T>(this, topic, conf, producerCreatedFuture, partitionIndex, schema,
                 interceptors) {
             @Override
-            protected BlockingQueue<OpSendMsg> createPendingMessagesQueue() {
-                return new ArrayBlockingQueue<OpSendMsg>(conf.getMaxPendingMessages()) {
+            protected OpSendMsgQueue createPendingMessagesQueue() {
+                return new OpSendMsgQueue() {
                     @Override
-                    public void put(OpSendMsg opSendMsg) throws InterruptedException {
-                        super.put(opSendMsg);
+                    public boolean add(OpSendMsg opSendMsg) {
+                        boolean added = super.add(opSendMsg);
                         if (pendingMessageCallback != null) {
                             pendingMessageCallback.accept(opSendMsg);
                         }
+                        return added;
                     }
                 };
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 84062fb..8ba2694 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -29,9 +29,7 @@ import static org.apache.pulsar.client.impl.ProducerBase.MultiSchemaMode.Auto;
 import static org.apache.pulsar.client.impl.ProducerBase.MultiSchemaMode.Enabled;
 import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
 import static org.apache.pulsar.common.protocol.Commands.readChecksum;
-
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
@@ -39,10 +37,10 @@ import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.ScheduledFuture;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -50,12 +48,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Consumer;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
@@ -99,7 +97,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     // Variable is used through the atomic updater
     private volatile long msgIdGenerator;
 
-    private final Queue<OpSendMsg> pendingMessages;
+    private final OpSendMsgQueue pendingMessages;
     private final Optional<Semaphore> semaphore;
     private volatile Timeout sendTimeout = null;
     private long createProducerTimeout;
@@ -251,8 +249,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         grabCnx();
     }
 
-    protected Queue<OpSendMsg> createPendingMessagesQueue() {
-        return new ArrayDeque<>();
+    protected OpSendMsgQueue createPendingMessagesQueue() {
+        return new OpSendMsgQueue();
     }
 
     public ConnectionHandler getConnectionHandler() {
@@ -1281,6 +1279,72 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         };
     }
 
+    /**
+     * Queue implementation that is used as the pending messages queue.
+     *
+     * This implementation postpones adding of new OpSendMsg entries that happen
+     * while the forEach call is in progress. This is needed for preventing
+     * ConcurrentModificationExceptions that would occur when the forEach action
+     * calls the add method via a callback in user code.
+     *
+     * This queue is not thread safe.
+     */
+    protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
+        private final Queue<OpSendMsg> delegate = new ArrayDeque<>();
+        private int forEachDepth = 0;
+        private List<OpSendMsg> postponedOpSendMgs;
+
+        @Override
+        public void forEach(Consumer<? super OpSendMsg> action) {
+            try {
+                // track any forEach call that is in progress in the current call stack
+                // so that adding a new item while iterating doesn't cause ConcurrentModificationException
+                forEachDepth++;
+                delegate.forEach(action);
+            } finally {
+                forEachDepth--;
+                // if this is the top-most forEach call and there are postponed items, add them
+                if (forEachDepth == 0 && postponedOpSendMgs != null && !postponedOpSendMgs.isEmpty()) {
+                    delegate.addAll(postponedOpSendMgs);
+                    postponedOpSendMgs.clear();
+                }
+            }
+        }
+
+        public boolean add(OpSendMsg o) {
+            // postpone adding to the queue while forEach iteration is in progress
+            if (forEachDepth > 0) {
+                if (postponedOpSendMgs == null) {
+                    postponedOpSendMgs = new ArrayList<>();
+                }
+                return postponedOpSendMgs.add(o);
+            } else {
+                return delegate.add(o);
+            }
+        }
+
+        public void clear() {
+            delegate.clear();
+        }
+
+        public void remove() {
+            delegate.remove();
+        }
+
+        public OpSendMsg peek() {
+            return delegate.peek();
+        }
+
+        public int size() {
+            return delegate.size();
+        }
+
+        @Override
+        public Iterator<OpSendMsg> iterator() {
+            return delegate.iterator();
+        }
+    }
+
     @Override
     public void connectionOpened(final ClientCnx cnx) {
         // we set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating the
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
new file mode 100644
index 0000000..bf45e87
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Contains unit tests for ProducerImpl.OpSendMsgQueue inner class.
+ */
+public class OpSendMsgQueueTest {
+    MessageImpl<?> message;
+
+    @BeforeClass
+    public void createMockMessage() {
+        message = mock(MessageImpl.class);
+        when(message.getUncompressedSize()).thenReturn(0);
+    }
+
+    private ProducerImpl.OpSendMsg createDummyOpSendMsg() {
+        return ProducerImpl.OpSendMsg.create(message, null, 0L, null);
+    }
+
+    @Test
+    public void shouldPostponeAddsToPreventConcurrentModificationException() {
+        // given
+        ProducerImpl.OpSendMsgQueue queue = new ProducerImpl.OpSendMsgQueue();
+        ProducerImpl.OpSendMsg opSendMsg = createDummyOpSendMsg();
+        ProducerImpl.OpSendMsg opSendMsg2 = createDummyOpSendMsg();
+        queue.add(opSendMsg);
+
+        // when
+        queue.forEach(item -> {
+            queue.add(opSendMsg2);
+        });
+
+        // then
+        assertEquals(Lists.newArrayList(queue), Arrays.asList(opSendMsg, opSendMsg2));
+    }
+
+    @Test
+    public void shouldPostponeAddsAlsoInRecursiveCalls() {
+        // given
+        ProducerImpl.OpSendMsgQueue queue = new ProducerImpl.OpSendMsgQueue();
+        ProducerImpl.OpSendMsg opSendMsg = createDummyOpSendMsg();
+        ProducerImpl.OpSendMsg opSendMsg2 = createDummyOpSendMsg();
+        ProducerImpl.OpSendMsg opSendMsg3 = createDummyOpSendMsg();
+        ProducerImpl.OpSendMsg opSendMsg4 = createDummyOpSendMsg();
+        queue.add(opSendMsg);
+
+        // when
+        queue.forEach(item -> {
+            queue.add(opSendMsg2);
+            // recursive forEach
+            queue.forEach(item2 -> {
+                queue.add(opSendMsg3);
+            });
+            queue.add(opSendMsg4);
+        });
+
+        // then
+        assertEquals(Lists.newArrayList(queue), Arrays.asList(opSendMsg, opSendMsg2, opSendMsg3, opSendMsg4));
+    }
+}

[pulsar] 05/06: [pulsar-functions-go] support set subscription position (#11990)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d5d40a2068485c341a72f223e7f1db76d1bedd30
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Sep 24 10:27:40 2021 +0800

    [pulsar-functions-go] support set subscription position (#11990)
    
    * stash
    
    * set default value
    
    (cherry picked from commit 652d1546569bd7bc2eabb0c258d7e5dc038bb334)
---
 pulsar-function-go/conf/conf.go                                  | 9 +++++----
 pulsar-function-go/conf/conf.yaml                                | 1 +
 pulsar-function-go/pf/instanceConf.go                            | 7 ++++---
 pulsar-function-go/pf/instanceConf_test.go                       | 7 ++++---
 .../apache/pulsar/functions/instance/go/GoInstanceConfig.java    | 2 ++
 .../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java   | 2 ++
 6 files changed, 18 insertions(+), 10 deletions(-)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index c0d1262..dfbf542 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -52,10 +52,11 @@ type Conf struct {
 	AutoACK              bool   `json:"autoAck" yaml:"autoAck"`
 	Parallelism          int32  `json:"parallelism" yaml:"parallelism"`
 	//source config
-	SubscriptionType    int32  `json:"subscriptionType" yaml:"subscriptionType"`
-	TimeoutMs           uint64 `json:"timeoutMs" yaml:"timeoutMs"`
-	SubscriptionName    string `json:"subscriptionName" yaml:"subscriptionName"`
-	CleanupSubscription bool   `json:"cleanupSubscription"  yaml:"cleanupSubscription"`
+	SubscriptionType     int32  `json:"subscriptionType" yaml:"subscriptionType"`
+	TimeoutMs            uint64 `json:"timeoutMs" yaml:"timeoutMs"`
+	SubscriptionName     string `json:"subscriptionName" yaml:"subscriptionName"`
+	CleanupSubscription  bool   `json:"cleanupSubscription"  yaml:"cleanupSubscription"`
+	SubscriptionPosition int32  `json:"subscriptionPosition" yaml:"subscriptionPosition"`
 	//source input specs
 	SourceSpecTopic            string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"`
 	SourceSchemaType           string `json:"sourceSchemaType" yaml:"sourceSchemaType"`
diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml
index 7809ab5..59ac9bb 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -41,6 +41,7 @@ subscriptionType: 0
 timeoutMs: 0
 subscriptionName: ""
 cleanupSubscription: false
+subscriptionPosition: 1
 # source input specs
 sourceSpecsTopic: persistent://public/default/topic-01
 sourceSchemaType: ""
diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go
index 1227024..2fc8d95 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -82,9 +82,10 @@ func newInstanceConf() *instanceConf {
 						},
 					},
 				},
-				TimeoutMs:           cfg.TimeoutMs,
-				SubscriptionName:    cfg.SubscriptionName,
-				CleanupSubscription: cfg.CleanupSubscription,
+				TimeoutMs:            cfg.TimeoutMs,
+				SubscriptionName:     cfg.SubscriptionName,
+				CleanupSubscription:  cfg.CleanupSubscription,
+				SubscriptionPosition: pb.SubscriptionPosition(cfg.SubscriptionPosition),
 			},
 			Sink: &pb.SinkSpec{
 				Topic:      cfg.SinkSpecTopic,
diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go
index be93239..fa87002 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -62,9 +62,10 @@ func Test_newInstanceConf(t *testing.T) {
 							},
 						},
 					},
-					TimeoutMs:           0,
-					SubscriptionName:    "",
-					CleanupSubscription: false,
+					TimeoutMs:            0,
+					SubscriptionName:     "",
+					CleanupSubscription:  false,
+					SubscriptionPosition: pb.SubscriptionPosition_EARLIEST,
 				},
 				Sink: &pb.SinkSpec{
 					Topic:      "persistent://public/default/topic-02",
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index 0cb1de2..85e73d3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.instance.go;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.pulsar.functions.proto.Function;
 
 @Setter
 @Getter
@@ -50,6 +51,7 @@ public class GoInstanceConfig {
     private long timeoutMs;
     private String subscriptionName = "";
     private boolean cleanupSubscription;
+    private int subscriptionPosition = Function.SubscriptionPosition.LATEST.getNumber();
 
     private String sourceSpecsTopic = "";
     private String sourceSchemaType = "";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 592cd55..107d5cf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -196,6 +196,8 @@ public class RuntimeUtils {
         if (instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) {
             goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
         }
+        goInstanceConfig.setSubscriptionPosition(
+                instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());
 
         if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() != null) {
             for (String inputTopic : instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {

[pulsar] 02/06: [pulsar-perf] Make it possible to disable poolMessages (#12090)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1b4b71b07564b2dd68f83e675b7c5fdd5ba4dbbe
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Sep 20 19:52:45 2021 +0300

    [pulsar-perf] Make it possible to disable poolMessages (#12090)
    
    - JCommander requires passing arity = 1 to boolean parameters that
      have a default value of true.
    
    (cherry picked from commit 898582b2a2a5f64f911c569befef92b5d56c7ed8)
---
 .../main/java/org/apache/pulsar/testclient/PerformanceConsumer.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 4045a27..59ffac3 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -172,11 +172,11 @@ public class PerformanceConsumer {
         @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
                 "used for handling connections to brokers, default is 1 thread")
         public int ioThreads = 1;
-    
+
         @Parameter(names = {"--batch-index-ack" }, description = "Enable or disable the batch index acknowledgment")
         public boolean batchIndexAck = false;
 
-        @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
+        @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message", arity = 1)
         private boolean poolMessages = true;
 
         @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")

[pulsar] 04/06: use correct line separator instead of \n (#12143)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bff59fd9b11deb7b8604486fda46ea2a04ff0b4f
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Thu Sep 23 12:43:44 2021 +0800

    use correct line separator instead of \n (#12143)
    
    (cherry picked from commit 3033ac88797697f070da4e88fcee4fceb47a4528)
---
 .../src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
index 17ec7c8..b9458bf 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
@@ -44,8 +44,8 @@ public class FutureUtilTest {
         timeoutException.printStackTrace(new PrintWriter(stringWriter, true));
         assertEquals(stringWriter.toString(),
                 "org.apache.pulsar.common.util.FutureUtil$LowOverheadTimeoutException: "
-                + "hello world\n"
-                + "\tat org.apache.pulsar.common.util.FutureUtilTest.test(...)(Unknown Source)\n");
+                + "hello world" + System.lineSeparator()
+                + "\tat org.apache.pulsar.common.util.FutureUtilTest.test(...)(Unknown Source)" + System.lineSeparator());
     }
 
     @Test

[pulsar] 01/06: Fix messages in TopicPolicies will never be cleaned up (#11928)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5b993ba1f9e135e57724141d54ba3c10f2d6990c
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Mon Sep 20 20:11:20 2021 +0800

    Fix messages in TopicPolicies will never be cleaned up (#11928)
    
    
    (cherry picked from commit 93e2db0a07b632032a36130e81d32b72136ef331)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 50 ++++++++++----
 .../systopic/TopicPoliciesSystemTopicClient.java   | 19 ++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 79 ++++++++++++++++++++++
 3 files changed, 135 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 49f934a..0e8bdd0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
@@ -95,19 +96,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
             if (ex != null) {
                 result.completeExceptionally(ex);
             } else {
-                writer.writeAsync(
-                        PulsarEvent.builder()
-                                .actionType(actionType)
-                        .eventType(EventType.TOPIC_POLICY)
-                        .topicPoliciesEvent(
-                            TopicPoliciesEvent.builder()
-                                .domain(topicName.getDomain().toString())
-                                .tenant(topicName.getTenant())
-                                .namespace(topicName.getNamespaceObject().getLocalName())
-                                .topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
-                                .policies(policies)
-                                .build())
-                        .build()).whenComplete(((messageId, e) -> {
+                PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
+                CompletableFuture<MessageId> actionFuture =
+                        ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event);
+                actionFuture.whenComplete(((messageId, e) -> {
                             if (e != null) {
                                 result.completeExceptionally(e);
                             } else {
@@ -133,6 +125,21 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         return result;
     }
 
+    private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) {
+        return PulsarEvent.builder()
+                .actionType(actionType)
+                .eventType(EventType.TOPIC_POLICY)
+                .topicPoliciesEvent(
+                        TopicPoliciesEvent.builder()
+                                .domain(topicName.getDomain().toString())
+                                .tenant(topicName.getTenant())
+                                .namespace(topicName.getNamespaceObject().getLocalName())
+                                .topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
+                                .policies(policies)
+                                .build())
+                .build();
+    }
+
     private void notifyListener(Message<PulsarEvent> msg) {
         if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
             return;
@@ -314,6 +321,11 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
     }
 
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
+        // delete policies
+        if (msg.getValue() == null) {
+            policiesCache.remove(TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()));
+            return;
+        }
         if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
             TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
             TopicName topicName =
@@ -329,7 +341,19 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                     policiesCache.put(topicName, event.getPolicies());
                     break;
                 case DELETE:
+                    // Since PR #11928, this branch is no longer needed.
+                    // However, due to compatibility, it is temporarily retained here
+                    // and can be deleted in the future.
                     policiesCache.remove(topicName);
+                    SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
+                            .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+                    systemTopicClient.newWriterAsync().thenAccept(writer
+                            -> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null))
+                            .whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> {
+                                if (ex != null) {
+                                    log.error("close writer failed ", ex);
+                                }
+                            })));
                     break;
                 case NONE:
                     break;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index 58462ea..847e4d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
@@ -88,6 +89,18 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
             return producer.newMessage().key(getEventKey(event)).value(event).sendAsync();
         }
 
+        @Override
+        public MessageId delete(PulsarEvent event) throws PulsarClientException {
+            validateActionType(event);
+            return producer.newMessage().key(getEventKey(event)).value(null).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) {
+            validateActionType(event);
+            return producer.newMessage().key(getEventKey(event)).value(null).sendAsync();
+        }
+
         private String getEventKey(PulsarEvent event) {
             return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
                 event.getTopicPoliciesEvent().getTenant(),
@@ -115,6 +128,12 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
         }
     }
 
+    private static void validateActionType(PulsarEvent event) {
+        if (event == null || !ActionType.DELETE.equals(event.getActionType())) {
+            throw new UnsupportedOperationException("The only supported ActionType is DELETE");
+        }
+    }
+
     private static class TopicPolicyReader implements Reader<PulsarEvent> {
 
         private final org.apache.pulsar.client.api.Reader<PulsarEvent> reader;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 9e2b9cf..b570bff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -59,6 +60,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -2473,6 +2475,83 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testPoliciesCanBeDeletedWithTopic() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        final String topic2 = testTopic + UUID.randomUUID();
+        pulsarClient.newProducer().topic(topic).create().close();
+        pulsarClient.newProducer().topic(topic2).create().close();
+
+        Awaitility.await().untilAsserted(() -> {
+            Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
+            Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNull();
+        });
+        // Init Topic Policies. Send 4 messages in a row, there should be only 2 messages left after compression
+        admin.topics().setMaxConsumersPerSubscription(topic, 1);
+        admin.topics().setMaxConsumersPerSubscription(topic2, 2);
+        admin.topics().setMaxConsumersPerSubscription(topic, 3);
+        admin.topics().setMaxConsumersPerSubscription(topic2, 4);
+        Awaitility.await().untilAsserted(() -> {
+            Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
+            Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNotNull();
+        });
+        String topicPoliciesTopic = "persistent://" + myNamespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicPoliciesTopic).get().get();
+        // Trigger compaction and make sure it is finished.
+        persistentTopic.triggerCompaction();
+        Field field = PersistentTopic.class.getDeclaredField("currentCompaction");
+        field.setAccessible(true);
+        CompletableFuture<Long> future = (CompletableFuture<Long>)field.get(persistentTopic);
+        Awaitility.await().untilAsserted(() -> assertTrue(future.isDone()));
+
+        Consumer consumer = pulsarClient.newConsumer()
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .readCompacted(true)
+                .topic(topicPoliciesTopic).subscriptionName("sub").subscribe();
+        int count = 0;
+        while (true) {
+            Message message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                count++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        consumer.close();
+        assertEquals(count, 2);
+
+        // Delete topic, there should be only 1 message left after compression
+        admin.topics().delete(topic, true);
+
+        Awaitility.await().untilAsserted(() ->
+                assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))));
+        persistentTopic.triggerCompaction();
+        field = PersistentTopic.class.getDeclaredField("currentCompaction");
+        field.setAccessible(true);
+        CompletableFuture<Long> future2 = (CompletableFuture<Long>)field.get(persistentTopic);
+        Awaitility.await().untilAsserted(() -> assertTrue(future2.isDone()));
+
+        consumer = pulsarClient.newConsumer()
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .readCompacted(true)
+                .topic(topicPoliciesTopic).subscriptionName("sub").subscribe();
+        count = 0;
+        while (true) {
+            Message message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                count++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        consumer.close();
+        assertEquals(count, 1);
+
+    }
+
+    @Test
     public void testPolicyIsDeleteTogetherAutomatically() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();