You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/09 12:38:26 UTC

[pulsar] branch branch-2.11 updated: [cherry-pick][branch-2.11] cherry-pick fixing can not delete namespace by force (#18307) (#18826)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8138553cefb [cherry-pick][branch-2.11] cherry-pick fixing can not delete namespace by force (#18307) (#18826)
8138553cefb is described below

commit 8138553cefbcab3614b2baa24241cb95388e125b
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Dec 9 20:38:17 2022 +0800

    [cherry-pick][branch-2.11] cherry-pick fixing can not delete namespace by force (#18307) (#18826)
    
    ### Motivation
    Cherry-pick (#18307) to release 2.11.1.
    ### Modifications
    
    Cherry-pick (#18307) to release 2.11.1.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  84 +++++++++++---
 .../broker/service/persistent/PersistentTopic.java | 128 +++++++++++----------
 .../apache/pulsar/broker/admin/NamespacesTest.java |  81 +++++++++++--
 .../broker/transaction/TransactionProduceTest.java |  29 -----
 .../pulsar/broker/transaction/TransactionTest.java |   5 +-
 5 files changed, 212 insertions(+), 115 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4d8f49be965..60d171a9819 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -50,6 +50,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.lang3.StringUtils;
@@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -471,16 +473,26 @@ public abstract class NamespacesBase extends AdminResource {
             if (!topics.isEmpty()) {
                 Set<String> partitionedTopics = new HashSet<>();
                 Set<String> nonPartitionedTopics = new HashSet<>();
+                Set<String> allSystemTopics = new HashSet<>();
+                Set<String> allPartitionedSystemTopics = new HashSet<>();
 
                 for (String topic : topics) {
                     try {
                         TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
+                            if (pulsar().getBrokerService().isSystemTopic(topicName)) {
+                                allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                continue;
+                            }
                             String partitionedTopic = topicName.getPartitionedTopicName();
                             if (!partitionedTopics.contains(partitionedTopic)) {
                                 partitionedTopics.add(partitionedTopic);
                             }
                         } else {
+                            if (pulsar().getBrokerService().isSystemTopic(topicName)) {
+                                allSystemTopics.add(topic);
+                                continue;
+                            }
                             nonPartitionedTopics.add(topic);
                         }
                         topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
@@ -508,21 +520,24 @@ public abstract class NamespacesBase extends AdminResource {
                 }
 
                 final CompletableFuture<Throwable> topicFutureEx =
-                        FutureUtil.waitForAll(topicFutures).handle((result, exception) -> {
-                            if (exception != null) {
-                                if (exception.getCause() instanceof PulsarAdminException) {
-                                    asyncResponse
-                                            .resume(new RestException((PulsarAdminException) exception.getCause()));
-                                } else {
-                                    log.error("[{}] Failed to remove forcefully owned namespace {}",
-                                            clientAppId(), namespaceName, exception);
-                                    asyncResponse.resume(new RestException(exception.getCause()));
-                                }
-                                return exception;
-                            }
-
-                            return null;
-                        });
+                        FutureUtil.waitForAll(topicFutures)
+                                .thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics))
+                                .thenCompose((ignore) ->
+                                        internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                .handle((result, exception) -> {
+                                    if (exception != null) {
+                                        if (exception.getCause() instanceof PulsarAdminException) {
+                                            asyncResponse.resume(
+                                                    new RestException((PulsarAdminException) exception.getCause()));
+                                        } else {
+                                            log.error("[{}] Failed to remove forcefully owned namespace {}",
+                                                    clientAppId(), namespaceName, exception);
+                                            asyncResponse.resume(new RestException(exception.getCause()));
+                                        }
+                                        return exception;
+                                    }
+                                    return null;
+                                });
                 if (topicFutureEx.join() != null) {
                     return;
                 }
@@ -564,6 +579,45 @@ public abstract class NamespacesBase extends AdminResource {
         });
     }
 
+    private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+        log.info("internalDeletePartitionedTopicsAsync");
+        if (CollectionUtils.isEmpty(topicNames)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (Exception ex) {
+            log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (String topicName : topicNames) {
+            TopicName tn = TopicName.get(topicName);
+            futures.add(admin.topics().deletePartitionedTopicAsync(topicName, true, true));
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+    private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
+        log.info("internalDeleteTopicsAsync");
+        if (CollectionUtils.isEmpty(topicNames)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (Exception ex) {
+            log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (String topicName : topicNames) {
+            futures.add(admin.topics().deleteAsync(topicName, true, true));
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+
+
     @SuppressWarnings("deprecation")
     protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
                                                                          boolean force) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1d1160d7dfb..5ad6891d30c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1178,68 +1178,74 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 return null;
             });
 
-            closeClientFuture.thenAccept(delete -> {
-                CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
-                brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
-                deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema())
-                        .thenCompose(__ -> deleteTopicPolicies())
-                        .thenCompose(__ -> transactionBufferCleanupAndClose())
-                        .whenComplete((v, ex) -> {
-                            if (ex != null) {
-                                log.error("[{}] Error deleting topic", topic, ex);
-                                unfenceTopicToResume();
-                                deleteFuture.completeExceptionally(ex);
-                            } else {
-                                List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
-                                subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));
-
-                                FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
-                                    if (e != null) {
-                                        log.error("[{}] Error deleting topic", topic, e);
-                                        unfenceTopicToResume();
-                                        deleteFuture.completeExceptionally(e);
-                                    } else {
-                                        ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
-                                            @Override
-                                            public void deleteLedgerComplete(Object ctx) {
-                                                brokerService.removeTopicFromCache(PersistentTopic.this);
-
-                                                dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-                                                subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-                                                unregisterTopicPolicyListener();
-
-                                                log.info("[{}] Topic deleted", topic);
-                                                deleteFuture.complete(null);
-                                            }
-
-                                            @Override
-                                            public void deleteLedgerFailed(ManagedLedgerException exception,
-                                                                           Object ctx) {
-                                                if (exception.getCause()
-                                                        instanceof MetadataStoreException.NotFoundException) {
-                                                    log.info("[{}] Topic is already deleted {}",
-                                                            topic, exception.getMessage());
-                                                    deleteLedgerComplete(ctx);
-                                                } else {
-                                                    unfenceTopicToResume();
-                                                    log.error("[{}] Error deleting topic", topic, exception);
-                                                    deleteFuture.completeExceptionally(
-                                                            new PersistenceException(exception));
+                closeClientFuture.thenAccept(__ -> {
+                    CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
+                    brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
+                    deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
+                            .thenCompose(ignore -> {
+                                if (!this.getBrokerService().getPulsar().getBrokerService()
+                                        .isSystemTopic(TopicName.get(topic))) {
+                                    return deleteTopicPolicies();
+                                } else {
+                                    return CompletableFuture.completedFuture(null);
+                                }
+                                })
+                            .thenCompose(ignore -> transactionBufferCleanupAndClose())
+                            .whenComplete((v, ex) -> {
+                                if (ex != null) {
+                                    log.error("[{}] Error deleting topic", topic, ex);
+                                    unfenceTopicToResume();
+                                    deleteFuture.completeExceptionally(ex);
+                                } else {
+                                    List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
+                                    subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));
+                                    FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+                                        if (e != null) {
+                                            log.error("[{}] Error deleting topic", topic, e);
+                                            unfenceTopicToResume();
+                                            deleteFuture.completeExceptionally(e);
+                                        } else {
+                                            ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
+                                                @Override
+                                                public void deleteLedgerComplete(Object ctx) {
+                                                    brokerService.removeTopicFromCache(PersistentTopic.this);
+
+                                                    dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+                                                    subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+                                                    unregisterTopicPolicyListener();
+
+                                                    log.info("[{}] Topic deleted", topic);
+                                                    deleteFuture.complete(null);
                                                 }
-                                            }
-                                        }, null);
-                                    }
-                                });
-                            }
-                        });
-            }).exceptionally(ex->{
-                unfenceTopicToResume();
-                deleteFuture.completeExceptionally(
-                        new TopicBusyException("Failed to close clients before deleting topic."));
-                return null;
-            });
+
+                                                @Override
+                                                public void deleteLedgerFailed(ManagedLedgerException exception,
+                                                                               Object ctx) {
+                                                    if (exception.getCause()
+                                                            instanceof MetadataStoreException.NotFoundException) {
+                                                        log.info("[{}] Topic is already deleted {}",
+                                                                topic, exception.getMessage());
+                                                        deleteLedgerComplete(ctx);
+                                                    } else {
+                                                        unfenceTopicToResume();
+                                                        log.error("[{}] Error deleting topic", topic, exception);
+                                                        deleteFuture.completeExceptionally(
+                                                                new PersistenceException(exception));
+                                                    }
+                                                }
+                                            }, null);
+                                         }
+                                    });
+                                }
+                            });
+                }).exceptionally(ex->{
+                    unfenceTopicToResume();
+                    deleteFuture.completeExceptionally(
+                            new TopicBusyException("Failed to close clients before deleting topic."));
+                    return null;
+                });
         } finally {
             lock.writeLock().unlock();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 93ac82ade44..dc4df8330cc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -70,6 +70,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -85,6 +86,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -111,6 +114,7 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -1389,8 +1393,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
                 OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
-        assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-                new Long(-1));
+        assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+                .getManagedLedgerOffloadThresholdInBytes(), -1L), 0);
 
         // set an override for the namespace
         admin.namespaces().setOffloadThreshold(namespace, 100);
@@ -1406,8 +1410,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
                 OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
-        assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-                new Long(100));
+        assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+                .getManagedLedgerOffloadThresholdInBytes(), 100L), 0);
 
         // set another negative value to disable
         admin.namespaces().setOffloadThreshold(namespace, -2);
@@ -1422,8 +1426,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
                 OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
-        assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-                new Long(-2));
+        assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+                .getManagedLedgerOffloadThresholdInBytes(), -2L), 0);
 
         // set back to -1 and fall back to default
         admin.namespaces().setOffloadThreshold(namespace, -1);
@@ -1438,8 +1442,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
                 OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
-        assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
-                new Long(-1));
+        assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+                .getManagedLedgerOffloadThresholdInBytes(), -1L), 0);
 
         // cleanup
         admin.topics().delete(topicName.toString(), true);
@@ -1881,4 +1885,65 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws Exception {
+        String namespace = this.testTenant + "/delete-namespace";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-namespace",
+                "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+        Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. change the order of the topics in this namespace.
+        List<String> topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+        Assert.assertTrue(topics.size() >= 2);
+        for (int i = 0; i < topics.size(); i++) {
+            if (topics.get(i).contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+                String systemTopic = topics.get(i);
+                topics.set(i, topics.get(0));
+                topics.set(0, systemTopic);
+            }
+        }
+        NamespaceService mockNamespaceService = spy(pulsar.getNamespaceService());
+        Field namespaceServiceField = pulsar.getClass().getDeclaredField("nsService");
+        namespaceServiceField.setAccessible(true);
+        namespaceServiceField.set(pulsar, mockNamespaceService);
+        doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+        // 5. delete the namespace
+        admin.namespaces().deleteNamespace(namespace, true);
+
+    }
+
+    @Test
+    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
+        String namespace = this.testTenant + "/delete-systemTopic";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
+                "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. delete the policies topic and the topic wil not to clear topic polices
+        admin.topics().delete(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index d43221a64e2..0d1bbda4568 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends TransactionTestBase {
         produceTest(true);
     }
 
-    @Test
-    public void testDeleteNamespaceBeforeCommit() throws Exception {
-        final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
-        PulsarClient pulsarClient = this.pulsarClient;
-        Transaction tnx = pulsarClient.newTransaction()
-                .withTransactionTimeout(60, TimeUnit.SECONDS)
-                .build().get();
-        long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
-        long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
-        Assert.assertTrue(txnIdMostBits > -1);
-        Assert.assertTrue(txnIdLeastBits > -1);
-
-        @Cleanup
-        Producer<byte[]> outProducer = pulsarClient
-                .newProducer()
-                .topic(topic)
-                .sendTimeout(0, TimeUnit.SECONDS)
-                .enableBatching(false)
-                .create();
-
-        String content = "Hello Txn";
-        outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
-        try {
-            deleteNamespaceGraceFully(NAMESPACE1, true);
-        } catch (Exception ignore) {}
-        tnx.commit().get();
-    }
-
     @Test
     public void produceAndAbortTest() throws Exception {
         produceTest(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 307244a6447..f0417575446 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -166,7 +166,8 @@ public class TransactionTest extends TransactionTestBase {
     public void testCreateTransactionSystemTopic() throws Exception {
         String subName = "test";
         String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
-
+        admin.namespaces().deleteNamespace(NAMESPACE1, true);
+        admin.namespaces().createNamespace(NAMESPACE1);
         try {
             // init pending ack
             @Cleanup
@@ -182,7 +183,7 @@ public class TransactionTest extends TransactionTestBase {
 
         // getList does not include transaction system topic
         List<String> list = admin.topics().getList(NAMESPACE1);
-        assertEquals(list.size(), 2);
+        assertFalse(list.isEmpty());
         list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
 
         try {