You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Dave Fisher <wa...@apache.org> on 2022/04/25 20:03:45 UTC

{CHERRY-PICK concerns] Re: [pulsar] branch master updated: [improve][broker] Tidy up the system topic. (#15252)

I am not saying we need to cherry pick this change. I want to note that this seemingly small change does touch a large number of files with 44 changed. Changing that many files may impact future changes that will need to be cherry picked. Those cherry picks of future PRs to 2.8.4, 2.9.3, and 2.10.1 will be more difficult.

I think we need to make sure that PRs that are made to the master branch that will be cherry picked are quickly cherry picked.

Let’s keep our branches in release order!

All The Best,
Dave

> On Apr 24, 2022, at 6:39 PM, penghui@apache.org wrote:
> 
> This is an automated email from the ASF dual-hosted git repository.
> 
> penghui pushed a commit to branch master
> in repository https://gitbox.apache.org/repos/asf/pulsar.git
> 
> 
> The following commit(s) were added to refs/heads/master by this push:
>     new aa4df1b3af1 [improve][broker] Tidy up the system topic. (#15252)
> aa4df1b3af1 is described below
> 
> commit aa4df1b3af1298b087661a28b1d5e0f601ac6925
> Author: Jiwei Guo <te...@apache.org>
> AuthorDate: Mon Apr 25 09:39:42 2022 +0800
> 
>    [improve][broker] Tidy up the system topic. (#15252)
> 
>    ### Motivation
> 
>    For #13520, #14643, #14949, we fix some issues related to system topic but result in checking the system topic in different method. So it's better to tidy up the system topic.
> 
>    So put these system topic names into a new class called SystemTopicNames.
> ---
> .../apache/pulsar/PulsarClusterMetadataSetup.java  |  3 +-
> .../java/org/apache/pulsar/PulsarStandalone.java   | 17 ++++++---
> .../PulsarTransactionCoordinatorMetadataSetup.java |  5 +--
> .../org/apache/pulsar/broker/PulsarService.java    | 23 ++-----------
> .../broker/TransactionMetadataStoreService.java    |  7 ++--
> .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++--
> .../pulsar/broker/admin/impl/TransactionsBase.java | 13 +++----
> .../pulsar/broker/namespace/NamespaceService.java  | 12 +++++--
> .../ResourceUsageTopicTransportManager.java        |  8 ++---
> .../pulsar/broker/service/BrokerService.java       | 30 ++++------------
> .../SystemTopicBasedTopicPoliciesService.java      |  3 +-
> .../service/nonpersistent/NonPersistentTopic.java  |  8 ++---
> .../service/persistent/PersistentSubscription.java |  4 +--
> .../broker/service/persistent/PersistentTopic.java | 16 ++++-----
> .../broker/service/persistent/SystemTopic.java     | 12 +++----
> .../stats/prometheus/TransactionAggregator.java    |  4 +--
> .../NamespaceEventsSystemTopicFactory.java         | 10 +++---
> .../pendingack/impl/MLPendingAckStore.java         |  9 ++---
> .../pulsar/broker/web/PulsarWebResource.java       |  6 +---
> .../pulsar/broker/admin/TopicPoliciesTest.java     |  4 +--
> .../v3/AdminApiTransactionMultiBrokerTest.java     | 18 +++++++---
> .../broker/admin/v3/AdminApiTransactionTest.java   | 17 +++++----
> .../ResourceUsageTransportManagerTest.java         |  3 +-
> .../BrokerServiceAutoTopicCreationTest.java        |  6 ++--
> .../pulsar/broker/service/BrokerServiceTest.java   |  4 +--
> .../pulsar/broker/service/BrokerTestBase.java      | 15 ++++++++
> .../pulsar/broker/service/ReplicatorTest.java      | 25 ++++++++------
> .../TransactionMetadataStoreServiceTest.java       |  9 +++--
> .../broker/stats/ManagedLedgerMetricsTest.java     |  4 +--
> .../broker/stats/TransactionMetricsTest.java       | 14 ++++----
> .../NamespaceEventsSystemTopicServiceTest.java     |  4 +--
> .../systopic/PartitionedSystemTopicTest.java       |  4 +--
> .../TopicTransactionBufferRecoverTest.java         |  5 ++-
> .../broker/transaction/TransactionConsumeTest.java |  4 +--
> .../pulsar/broker/transaction/TransactionTest.java |  9 +++--
> .../broker/transaction/TransactionTestBase.java    | 14 ++++++--
> .../buffer/TransactionBufferCloseTest.java         |  5 ++-
> .../buffer/TransactionLowWaterMarkTest.java        |  8 ++---
> .../TransactionMetaStoreAssignmentTest.java        |  4 +--
> .../TransactionCoordinatorClientImpl.java          |  8 +++--
> .../SystemTopicNames.java}                         | 40 ++++++++++++++++++----
> .../org/apache/pulsar/common/naming/TopicName.java |  6 ----
> .../Oauth2PerformanceTransactionTest.java          |  9 +++--
> .../testclient/PerformanceTransactionTest.java     |  8 +++--
> 44 files changed, 234 insertions(+), 209 deletions(-)
> 
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
> index 5146b513fb5..1dbb480f29d 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
> @@ -37,6 +37,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
> import org.apache.pulsar.broker.resources.TenantResources;
> import org.apache.pulsar.common.conf.InternalConfigurationData;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> import org.apache.pulsar.common.policies.data.ClusterData;
> @@ -311,7 +312,7 @@ public class PulsarClusterMetadataSetup {
>         createNamespaceIfAbsent(resources, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);
> 
>         // Create transaction coordinator assign partitioned topic
> -        createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
> +        createPartitionedTopic(configStore, SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
>                 arguments.numTransactionCoordinators);
> 
>         localStore.close();
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
> index f94491235e6..f8d1548c842 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
> @@ -19,7 +19,7 @@
> package org.apache.pulsar;
> 
> import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
> -import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
> import com.beust.jcommander.Parameter;
> import com.google.common.collect.Sets;
> import java.io.File;
> @@ -30,9 +30,11 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
> import org.apache.logging.log4j.LogManager;
> import org.apache.pulsar.broker.PulsarService;
> import org.apache.pulsar.broker.ServiceConfiguration;
> +import org.apache.pulsar.broker.resources.NamespaceResources;
> import org.apache.pulsar.client.admin.PulsarAdmin;
> import org.apache.pulsar.client.admin.PulsarAdminException;
> import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> import org.apache.pulsar.common.policies.data.ClusterData;
> import org.apache.pulsar.common.policies.data.TenantInfo;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> @@ -311,10 +313,15 @@ public class PulsarStandalone implements AutoCloseable {
>         createNameSpace(cluster, TopicName.PUBLIC_TENANT, TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE);
>         //create pulsar system namespace
>         createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE.toString());
> -        if (config.isTransactionCoordinatorEnabled() && !admin.namespaces()
> -                .getTopics(SYSTEM_NAMESPACE.toString())
> -                .contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) {
> -            admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
> +        if (config.isTransactionCoordinatorEnabled()) {
> +            NamespaceResources.PartitionedTopicResources partitionedTopicResources =
> +                    broker.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
> +            Optional<PartitionedTopicMetadata> getResult =
> +                    partitionedTopicResources.getPartitionedTopicMetadataAsync(TRANSACTION_COORDINATOR_ASSIGN).get();
> +            if (!getResult.isPresent()) {
> +                partitionedTopicResources.createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN,
> +                        new PartitionedTopicMetadata(1));
> +            }
>         }
> 
>         log.debug("--- setup completed ---");
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
> index 0f66bd01ee0..66607dd4c0a 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
> @@ -22,7 +22,7 @@ import com.beust.jcommander.JCommander;
> import com.beust.jcommander.Parameter;
> import org.apache.pulsar.broker.resources.PulsarResources;
> import org.apache.pulsar.common.naming.NamespaceName;
> -import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.util.CmdGenerateDocs;
> import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
> 
> @@ -103,7 +103,8 @@ public class PulsarTransactionCoordinatorMetadataSetup {
>                     arguments.cluster);
> 
>             // Create transaction coordinator assign partitioned topic
> -            PulsarClusterMetadataSetup.createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
> +            PulsarClusterMetadataSetup.createPartitionedTopic(configStore,
> +                    SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
>                     arguments.numTransactionCoordinators);
>         }
> 
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
> index 35cd22d4bbe..fc5feedc5e1 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
> @@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
> import static org.apache.commons.lang3.StringUtils.isBlank;
> import static org.apache.commons.lang3.StringUtils.isNotBlank;
> import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
> -import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
> import com.google.common.annotations.VisibleForTesting;
> import com.google.common.collect.ImmutableMap;
> import com.google.common.collect.Lists;
> @@ -113,7 +113,6 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
> import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
> import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
> import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
> -import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
> import org.apache.pulsar.broker.validator.MultipleListenerValidator;
> import org.apache.pulsar.broker.web.WebService;
> import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
> @@ -273,8 +272,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
>     private volatile CompletableFuture<Void> closeFuture;
>     // key is listener name, value is pulsar address and pulsar ssl address
>     private Map<String, AdvertisedListener> advertisedListeners;
> -    private NamespaceName heartbeatNamespaceV1;
> -    private NamespaceName heartbeatNamespaceV2;
> 
>     public PulsarService(ServiceConfiguration config) {
>         this(config, Optional.empty(), (exitCode) -> {
> @@ -723,8 +720,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
>             createMetricsServlet();
>             this.addWebServerHandlers(webService, metricsServlet, this.config);
>             this.webService.start();
> -            heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config);
> -            heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(this.advertisedAddress, this.config);
> 
>             // Refresh addresses and update configuration, since the port might have been dynamically assigned
>             if (config.getBrokerServicePort().equals(Optional.of(0))) {
> @@ -1129,7 +1124,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
>                     .get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
>                 try {
>                     TopicName topicName = TopicName.get(topic);
> -                    if (bundle.includes(topicName) && !isTransactionSystemTopic(topicName)) {
> +                    if (bundle.includes(topicName) && !isTransactionInternalName(topicName)) {
>                         CompletableFuture<Optional<Topic>> future = brokerService.getTopicIfExists(topic);
>                         if (future != null) {
>                             persistentTopics.add(future);
> @@ -1709,20 +1704,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
>         processTerminator.accept(-1);
>     }
> 
> -
> -    public static boolean isTransactionSystemTopic(TopicName topicName) {
> -        String topic = topicName.toString();
> -        return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
> -                || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
> -                || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
> -    }
> -
> -    public static boolean isTransactionInternalName(TopicName topicName) {
> -        String topic = topicName.toString();
> -        return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
> -                || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
> -    }
> -
>     @VisibleForTesting
>     protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
>         return new BrokerService(pulsar, ioEventLoopGroup);
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
> index 902546958c5..569f5900342 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
> @@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
> import org.apache.pulsar.common.api.proto.TxnAction;
> import org.apache.pulsar.common.naming.NamespaceBundle;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.util.FutureUtil;
> import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
> @@ -124,7 +125,7 @@ public class TransactionMetadataStoreService {
>                             if (ex == null) {
>                                 for (String topic : topics) {
>                                     TopicName name = TopicName.get(topic);
> -                                    if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
> +                                    if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
>                                             .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
>                                             && name.isPartitioned()) {
>                                         handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex()));
> @@ -144,7 +145,7 @@ public class TransactionMetadataStoreService {
>                             if (ex == null) {
>                                 for (String topic : topics) {
>                                     TopicName name = TopicName.get(topic);
> -                                    if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
> +                                    if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
>                                             .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
>                                             && name.isPartitioned()) {
>                                         removeTransactionMetadataStore(
> @@ -170,7 +171,7 @@ public class TransactionMetadataStoreService {
>             if (stores.get(tcId) != null) {
>                 completableFuture.complete(null);
>             } else {
> -                pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
> +                pulsarService.getBrokerService().checkTopicNsOwnership(SystemTopicNames
>                         .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
>                         .thenRun(() -> internalPinnedExecutor.execute(() -> {
>                     final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
> index 909c7de8d74..5ed19f5ed96 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
> @@ -19,9 +19,9 @@
> package org.apache.pulsar.broker.admin.impl;
> 
> import static java.util.concurrent.TimeUnit.SECONDS;
> -import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
> import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
> -import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
> import com.fasterxml.jackson.core.JsonProcessingException;
> import com.github.zafarkhaja.semver.Version;
> import com.google.common.collect.Lists;
> @@ -723,7 +723,7 @@ public class PersistentTopicsBase extends AdminResource {
>        future.thenAccept(__ -> {
>            // If the topic name is a partition name, no need to get partition topic metadata again
>            if (topicName.isPartitioned()) {
> -               if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
> +               if (isTransactionCoordinatorAssign(topicName)) {
>                    internalUnloadTransactionCoordinatorAsync(asyncResponse, authoritative);
>                } else {
>                    internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
> index 8eff6815404..bbd2e0be9c3 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
> @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.web.RestException;
> import org.apache.pulsar.client.admin.Transactions;
> import org.apache.pulsar.client.api.transaction.TxnID;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicDomain;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.TransactionBufferStats;
> @@ -68,7 +69,7 @@ public abstract class TransactionsBase extends AdminResource {
>     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
>                                                Integer coordinatorId) {
>         if (coordinatorId != null) {
> -            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
> +            validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
>                     authoritative);
>             TransactionMetadataStore transactionMetadataStore =
>                     pulsar().getTransactionMetadataStoreService().getStores()
> @@ -80,7 +81,7 @@ public abstract class TransactionsBase extends AdminResource {
>             }
>             asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
>         } else {
> -            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
> +            getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
>                     false, false).thenAccept(partitionMetadata -> {
>                 if (partitionMetadata.partitions == 0) {
>                     asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
> @@ -151,7 +152,7 @@ public abstract class TransactionsBase extends AdminResource {
>     protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
>                                                   boolean authoritative, int mostSigBits, long leastSigBits) {
>         try {
> -            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
> +            validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
>                     authoritative);
>             CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
>             TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
> @@ -260,7 +261,7 @@ public abstract class TransactionsBase extends AdminResource {
>                                                boolean authoritative, long timeout, Integer coordinatorId) {
>         try {
>             if (coordinatorId != null) {
> -                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
> +                validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
>                         authoritative);
>                 TransactionMetadataStore transactionMetadataStore =
>                         pulsar().getTransactionMetadataStoreService().getStores()
> @@ -296,7 +297,7 @@ public abstract class TransactionsBase extends AdminResource {
>                     asyncResponse.resume(transactionMetadata);
>                 });
>             } else {
> -                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
> +                getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
>                         false, false).thenAccept(partitionMetadata -> {
>                     if (partitionMetadata.partitions == 0) {
>                         asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
> @@ -349,7 +350,7 @@ public abstract class TransactionsBase extends AdminResource {
>     protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative,
>                                                        boolean metadata, int coordinatorId) {
>         try {
> -            TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
> +            TopicName topicName = SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
>             validateTopicOwnership(topicName, authoritative);
>             TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
>                     .getStores().get(TransactionCoordinatorID.get(coordinatorId));
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
> index a321c035c78..e1e1dc10676 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
> @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
> import static java.lang.String.format;
> import static java.util.concurrent.TimeUnit.SECONDS;
> import static org.apache.commons.lang3.StringUtils.isNotBlank;
> +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
> import com.google.common.collect.Lists;
> import com.google.common.hash.Hashing;
> import io.prometheus.client.Counter;
> @@ -1372,9 +1373,16 @@ public class NamespaceService implements AutoCloseable {
>     }
> 
>     public static boolean isSystemServiceNamespace(String namespace) {
> +        return SYSTEM_NAMESPACE.toString().equals(namespace)
> +                || SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
> +                || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
> +                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
> +    }
> +
> +    public static boolean isHeartbeatNamespace(ServiceUnitId ns) {
> +        String namespace = ns.getNamespaceObject().toString();
>         return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
> -                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
> -                || SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
> +                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
>     }
> 
>     public boolean registerSLANamespace() throws PulsarServerException {
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
> index 38d1bb3c602..6ce0781781a 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java
> @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.Reader;
> import org.apache.pulsar.client.api.ReaderListener;
> import org.apache.pulsar.client.api.Schema;
> import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> import org.slf4j.Logger;
> @@ -69,7 +70,7 @@ public class ResourceUsageTopicTransportManager implements ResourceUsageTranspor
>             final int sendTimeoutSecs = 10;
> 
>             return pulsarClient.newProducer(Schema.BYTEBUFFER)
> -                    .topic(RESOURCE_USAGE_TOPIC_NAME)
> +                    .topic(SystemTopicNames.RESOURCE_USAGE_TOPIC.toString())
>                     .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
>                     .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
>                     .blockIfQueueFull(false)
> @@ -123,7 +124,7 @@ public class ResourceUsageTopicTransportManager implements ResourceUsageTranspor
> 
>         public ResourceUsageReader() throws PulsarClientException {
>             consumer =  pulsarClient.newReader()
> -                    .topic(RESOURCE_USAGE_TOPIC_NAME)
> +                    .topic(SystemTopicNames.RESOURCE_USAGE_TOPIC.toString())
>                     .startMessageId(MessageId.latest)
>                     .readerListener(this)
>                     .create();
> @@ -165,7 +166,6 @@ public class ResourceUsageTopicTransportManager implements ResourceUsageTranspor
>     }
> 
>     private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTopicTransportManager.class);
> -    public  static final String RESOURCE_USAGE_TOPIC_NAME = "non-persistent://pulsar/system/resource-usage";
>     private final PulsarService pulsarService;
>     private final PulsarClient pulsarClient;
>     private final ResourceUsageWriterTask pTask;
> @@ -177,7 +177,7 @@ public class ResourceUsageTopicTransportManager implements ResourceUsageTranspor
> 
>     private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException {
>         // Create a public tenant and default namespace
> -        TopicName topicName = TopicName.get(RESOURCE_USAGE_TOPIC_NAME);
> +        TopicName topicName = SystemTopicNames.RESOURCE_USAGE_TOPIC;
> 
>         PulsarAdmin admin = pulsarService.getAdminClient();
>         ServiceConfiguration config = pulsarService.getConfig();
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
> index 27784332fdc..c3f3dc64e5a 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
> @@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
> import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
> import static org.apache.commons.collections.CollectionUtils.isEmpty;
> import static org.apache.commons.lang3.StringUtils.isNotBlank;
> -import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
> import com.google.common.annotations.VisibleForTesting;
> import com.google.common.collect.ImmutableMap;
> import com.google.common.collect.Lists;
> @@ -121,7 +121,6 @@ import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
> import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
> import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
> import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
> -import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
> import org.apache.pulsar.broker.validator.BindAddressValidator;
> import org.apache.pulsar.client.admin.PulsarAdmin;
> import org.apache.pulsar.client.admin.PulsarAdminBuilder;
> @@ -132,13 +131,13 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
> import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
> import org.apache.pulsar.common.configuration.BindAddress;
> import org.apache.pulsar.common.configuration.FieldContext;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
> import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
> import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
> import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
> import org.apache.pulsar.common.naming.NamespaceBundle;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicDomain;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> @@ -1376,7 +1375,7 @@ public class BrokerService implements Closeable {
>             return;
>         }
> 
> -        if (isTransactionSystemTopic(topicName)) {
> +        if (isTransactionInternalName(topicName)) {
>             String msg = String.format("Can not create transaction system topic %s", topic);
>             log.warn(msg);
>             pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
> @@ -1904,7 +1903,7 @@ public class BrokerService implements Closeable {
>                     this.getPulsar().getTransactionMetadataStoreService();
>             // if the store belongs to this bundle, remove and close the store
>             this.getPulsar().getTransactionMetadataStoreService().getStores().values().stream().filter(store ->
> -                    serviceUnit.includes(TopicName.TRANSACTION_COORDINATOR_ASSIGN
> +                    serviceUnit.includes(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
>                             .getPartition((int) (store.getTransactionCoordinatorID().getId()))))
>                     .map(TransactionMetadataStore::getTransactionCoordinatorID)
>                     .forEach(tcId -> closeFutures.add(metadataStoreService.removeTransactionMetadataStore(tcId)));
> @@ -2936,25 +2935,8 @@ public class BrokerService implements Closeable {
>     }
> 
>     public boolean isSystemTopic(TopicName topicName) {
> -        if (topicName.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)
> -                || topicName.getNamespaceObject().equals(pulsar.getHeartbeatNamespaceV1())
> -                || topicName.getNamespaceObject().equals(pulsar.getHeartbeatNamespaceV2())) {
> -            return true;
> -        }
> -
> -        TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
> -
> -        // event topic
> -        if (EventsTopicNames.checkTopicIsEventsNames(nonePartitionedTopicName)) {
> -            return true;
> -        }
> -
> -        String localName = nonePartitionedTopicName.getLocalName();
> -        // transaction pending ack topic
> -        if (StringUtils.endsWith(localName, MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
> -            return true;
> -        }
> -        return false;
> +        return NamespaceService.isSystemServiceNamespace(topicName.getNamespace())
> +                || SystemTopicNames.isSystemTopic(topicName);
>     }
> 
>     /**
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
> index fc0700eaf77..f2db5b6be51 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
> @@ -236,8 +236,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
>     public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
>         CompletableFuture<Void> result = new CompletableFuture<>();
>         NamespaceName namespace = namespaceBundle.getNamespaceObject();
> -        if (NamespaceService.checkHeartbeatNamespace(namespace) != null
> -                || NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) {
> +        if (NamespaceService.isHeartbeatNamespace(namespace)) {
>             result.complete(null);
>             return result;
>         }
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
> index 337b4c7fc1b..1edc7ae12e8 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
> @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.LongAdder;
> import org.apache.bookkeeper.mledger.Entry;
> import org.apache.bookkeeper.mledger.Position;
> import org.apache.pulsar.broker.PulsarServerException;
> +import org.apache.pulsar.broker.namespace.NamespaceService;
> import org.apache.pulsar.broker.resources.NamespaceResources;
> import org.apache.pulsar.broker.service.AbstractReplicator;
> import org.apache.pulsar.broker.service.AbstractTopic;
> @@ -71,7 +72,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
> import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
> import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
> import org.apache.pulsar.common.api.proto.KeySharedMeta;
> -import org.apache.pulsar.common.naming.NamespaceName;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.BacklogQuota;
> import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
> @@ -536,11 +536,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
>     @Override
>     public CompletableFuture<Void> checkReplication() {
>         TopicName name = TopicName.get(topic);
> -        if (!name.isGlobal()) {
> -            return CompletableFuture.completedFuture(null);
> -        }
> -        NamespaceName heartbeatNamespace = brokerService.pulsar().getHeartbeatNamespaceV2();
> -        if (name.getNamespaceObject().equals(heartbeatNamespace)) {
> +        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
>             return CompletableFuture.completedFuture(null);
>         }
> 
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
> index ec77fe69eed..fd0922f8f71 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
> @@ -18,7 +18,7 @@
>  */
> package org.apache.pulsar.broker.service.persistent;
> 
> -import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
> import com.google.common.annotations.VisibleForTesting;
> import com.google.common.base.MoreObjects;
> import java.util.Collections;
> @@ -148,7 +148,7 @@ public class PersistentSubscription implements Subscription {
>         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
>                 ? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties);
>         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
> -                && !checkTopicIsEventsNames(TopicName.get(topicName))) {
> +                && !isEventSystemTopic(TopicName.get(topicName))) {
>             this.pendingAckHandle = new PendingAckHandleImpl(this);
>         } else {
>             this.pendingAckHandle = new PendingAckHandleDisabled();
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
> index 85ee429c026..eed8ba0d3b8 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
> @@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
> import static com.google.common.base.Preconditions.checkNotNull;
> import static org.apache.commons.lang3.StringUtils.isBlank;
> import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
> -import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
> import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
> import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
> import com.carrotsearch.hppc.ObjectObjectHashMap;
> @@ -78,6 +78,7 @@ import org.apache.bookkeeper.net.BookieId;
> import org.apache.commons.collections4.MapUtils;
> import org.apache.commons.lang3.StringUtils;
> import org.apache.pulsar.broker.PulsarServerException;
> +import org.apache.pulsar.broker.namespace.NamespaceService;
> import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
> import org.apache.pulsar.broker.service.AbstractReplicator;
> import org.apache.pulsar.broker.service.AbstractTopic;
> @@ -127,8 +128,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
> import org.apache.pulsar.common.api.proto.KeySharedMeta;
> import org.apache.pulsar.common.api.proto.MessageMetadata;
> import org.apache.pulsar.common.api.proto.TxnAction;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> -import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.BacklogQuota;
> import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
> @@ -292,7 +292,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
>         checkReplicatedSubscriptionControllerState();
>         TopicName topicName = TopicName.get(topic);
>         if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
> -                && !checkTopicIsEventsNames(topicName)) {
> +                && !isEventSystemTopic(topicName)) {
>             this.transactionBuffer = brokerService.getPulsar()
>                     .getTransactionBufferProvider().newTransactionBuffer(this);
>         } else {
> @@ -719,7 +719,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
>             }
> 
>             try {
> -                if (!topic.endsWith(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)
> +                if (!topic.endsWith(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)
>                         && !checkSubscriptionTypesEnable(subType)) {
>                     return FutureUtil.failedFuture(
>                             new NotAllowedException("Topic[{" + topic + "}] doesn't support "
> @@ -1377,11 +1377,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
>     @Override
>     public CompletableFuture<Void> checkReplication() {
>         TopicName name = TopicName.get(topic);
> -        if (!name.isGlobal()) {
> -            return CompletableFuture.completedFuture(null);
> -        }
> -        NamespaceName heartbeatNamespace = brokerService.pulsar().getHeartbeatNamespaceV2();
> -        if (name.getNamespaceObject().equals(heartbeatNamespace)) {
> +        if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
>             return CompletableFuture.completedFuture(null);
>         }
> 
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
> index 988310858cd..715a684902f 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
> @@ -22,9 +22,9 @@ package org.apache.pulsar.broker.service.persistent;
> import java.util.concurrent.CompletableFuture;
> import org.apache.bookkeeper.mledger.ManagedLedger;
> import org.apache.pulsar.broker.PulsarServerException;
> +import org.apache.pulsar.broker.namespace.NamespaceService;
> import org.apache.pulsar.broker.service.BrokerService;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> -import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> 
> public class SystemTopic extends PersistentTopic {
> @@ -65,7 +65,7 @@ public class SystemTopic extends PersistentTopic {
> 
>     @Override
>     public CompletableFuture<Void> checkReplication() {
> -        if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) {
> +        if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
>             return super.checkReplication();
>         }
>         return CompletableFuture.completedFuture(null);
> @@ -75,10 +75,6 @@ public class SystemTopic extends PersistentTopic {
>     public boolean isCompactionEnabled() {
>         // All system topics are using compaction except `HealthCheck`,
>         // even though is not explicitly set in the policies.
> -        TopicName name = TopicName.get(topic);
> -        NamespaceName heartbeatNamespaceV1 = brokerService.pulsar().getHeartbeatNamespaceV1();
> -        NamespaceName heartbeatNamespaceV2 = brokerService.pulsar().getHeartbeatNamespaceV2();
> -        return !name.getNamespaceObject().equals(heartbeatNamespaceV1)
> -                && !name.getNamespaceObject().equals(heartbeatNamespaceV2);
> +        return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic));
>     }
> }
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
> index e6ac1535f43..3dbda26b0a8 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
> @@ -18,7 +18,7 @@
>  */
> package org.apache.pulsar.broker.stats.prometheus;
> 
> -import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
> import io.netty.util.concurrent.FastThreadLocal;
> import java.util.HashMap;
> import java.util.Map;
> @@ -79,7 +79,7 @@ public class TransactionAggregator {
>                             topic.getSubscriptions().values().forEach(subscription -> {
>                                 try {
>                                     localManageLedgerStats.get().reset();
> -                                    if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
> +                                    if (!isEventSystemTopic(TopicName.get(subscription.getTopic().getName()))
>                                             && subscription instanceof  PersistentSubscription
>                                             && ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) {
>                                         ManagedLedger managedLedger =
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
> index 59fb8b032e9..9e162f741b2 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
> @@ -21,8 +21,8 @@ package org.apache.pulsar.broker.systopic;
> import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
> import org.apache.pulsar.client.api.PulsarClient;
> import org.apache.pulsar.common.events.EventType;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicDomain;
> import org.apache.pulsar.common.naming.TopicName;
> import org.slf4j.Logger;
> @@ -38,7 +38,7 @@ public class NamespaceEventsSystemTopicFactory {
> 
>     public TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) {
>         TopicName topicName = TopicName.get(TopicDomain.persistent.value(), namespaceName,
> -                EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
> +                SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
>         log.info("Create topic policies system topic client {}", topicName.toString());
>         return new TopicPoliciesSystemTopicClient(client, topicName);
>     }
> @@ -46,7 +46,7 @@ public class NamespaceEventsSystemTopicFactory {
>     public TransactionBufferSystemTopicClient createTransactionBufferSystemTopicClient(NamespaceName namespaceName,
>                                                    TransactionBufferSnapshotService transactionBufferSnapshotService) {
>         TopicName topicName = TopicName.get(TopicDomain.persistent.value(), namespaceName,
> -                EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
> +                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
>         log.info("Create transaction buffer snapshot client, topicName : {}", topicName.toString());
>         return new TransactionBufferSystemTopicClient(client, topicName, transactionBufferSnapshotService);
>     }
> @@ -55,10 +55,10 @@ public class NamespaceEventsSystemTopicFactory {
>         switch (eventType) {
>             case TOPIC_POLICY:
>                 return TopicName.get(TopicDomain.persistent.value(), namespaceName,
> -                        EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
> +                        SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
>             case TRANSACTION_BUFFER_SNAPSHOT:
>                 return TopicName.get(TopicDomain.persistent.value(), namespaceName,
> -                        EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
> +                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
>             default:
>                 return null;
>         }
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
> index ccc1628154c..289cd14a716 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
> @@ -45,6 +45,7 @@ import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckOp;
> import org.apache.pulsar.client.api.transaction.TxnID;
> import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
> import org.apache.pulsar.common.api.proto.CommandAck.AckType;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.jctools.queues.MessagePassingQueue;
> import org.jctools.queues.SpscArrayQueue;
> @@ -61,10 +62,6 @@ public class MLPendingAckStore implements PendingAckStore {
> 
>     private final ManagedCursor cursor;
> 
> -    public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
> -
> -    public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state";
> -
>     private final SpscArrayQueue<Entry> entryQueue;
> 
>     //this is for replay
> @@ -412,11 +409,11 @@ public class MLPendingAckStore implements PendingAckStore {
>     }
> 
>     public static String getTransactionPendingAckStoreSuffix(String originTopicName, String subName) {
> -        return TopicName.get(originTopicName) + "-" + subName + PENDING_ACK_STORE_SUFFIX;
> +        return TopicName.get(originTopicName) + "-" + subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
>     }
> 
>     public static String getTransactionPendingAckStoreCursorName() {
> -        return PENDING_ACK_STORE_CURSOR_NAME;
> +        return SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME;
>     }
> 
>     private static final Logger log = LoggerFactory.getLogger(MLPendingAckStore.class);
> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
> index baa5d3fe332..1690210a80a 100644
> --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
> +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
> @@ -775,11 +775,7 @@ public abstract class PulsarWebResource {
> 
>     public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
>                                                                                            NamespaceName namespace) {
> -        if (!namespace.isGlobal()) {
> -            return CompletableFuture.completedFuture(null);
> -        }
> -        NamespaceName heartbeatNamespace = pulsarService.getHeartbeatNamespaceV2();
> -        if (namespace.equals(heartbeatNamespace)) {
> +        if (!namespace.isGlobal() || NamespaceService.isHeartbeatNamespace(namespace)) {
>             return CompletableFuture.completedFuture(null);
>         }
> 
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
> index 60e591446d1..320a375145a 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
> @@ -65,8 +65,8 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
> import org.apache.pulsar.client.api.SubscriptionMode;
> import org.apache.pulsar.client.api.SubscriptionType;
> import org.apache.pulsar.common.api.proto.CommandSubscribe;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicDomain;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.BacklogQuota;
> @@ -2794,7 +2794,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
>             Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
>             Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNotNull();
>         });
> -        String topicPoliciesTopic = "persistent://" + myNamespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
> +        String topicPoliciesTopic = "persistent://" + myNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
>         PersistentTopic persistentTopic =
>                 (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicPoliciesTopic).get().get();
>         // Trigger compaction and make sure it is finished.
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
> index add277fc524..c9d554a2ca8 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
> @@ -23,7 +23,8 @@ import java.util.concurrent.TimeUnit;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.pulsar.broker.transaction.TransactionTestBase;
> import org.apache.pulsar.client.api.PulsarClient;
> -import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> +import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> import org.testng.annotations.AfterMethod;
> import org.testng.annotations.BeforeMethod;
> import org.testng.annotations.Test;
> @@ -48,11 +49,18 @@ public class AdminApiTransactionMultiBrokerTest extends TransactionTestBase {
>     @Test
>     public void testRedirectOfGetCoordinatorInternalStats() throws Exception {
>         Map<String, String> map = admin.lookups()
> -                .lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +                .lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>         while (map.containsValue(getPulsarServiceList().get(0).getBrokerServiceUrl())) {
> -            admin.topics().deletePartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> -            admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
> -            map = admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +            pulsarServiceList.get(0).getPulsarResources()
> +                    .getNamespaceResources()
> +                    .getPartitionedTopicResources()
> +                            .deletePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
> +            pulsarServiceList.get(0).getPulsarResources()
> +                    .getNamespaceResources()
> +                    .getPartitionedTopicResources()
> +                    .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
> +                            new PartitionedTopicMetadata(NUM_PARTITIONS));
> +            map = admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>         }
>         //init tc stores
>         pulsarClient = PulsarClient.builder()
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
> index aff09f2d7f1..d46b640aac3 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
> @@ -22,7 +22,6 @@ import com.google.common.collect.Sets;
> import org.apache.bookkeeper.mledger.impl.PositionImpl;
> import org.apache.http.HttpStatus;
> import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
> -import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
> import org.apache.pulsar.client.admin.PulsarAdminException;
> import org.apache.pulsar.client.api.Consumer;
> import org.apache.pulsar.client.api.Message;
> @@ -36,8 +35,10 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl;
> import org.apache.pulsar.client.impl.MessageIdImpl;
> import org.apache.pulsar.client.impl.transaction.TransactionImpl;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicDomain;
> import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> import org.apache.pulsar.common.policies.data.ClusterData;
> import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> @@ -486,14 +487,14 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
>         ManagedLedgerInternalStats managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats;
>         assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default",
>                 "testGetPendingAckInternalStats" + "-"
> -                        + subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
> +                        + subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
>                 stats.pendingAckLogStats.managedLedgerName);
> 
>         verifyManagedLegerInternalStats(managedLedgerInternalStats, 16);
> 
>         ManagedLedgerInternalStats finalManagedLedgerInternalStats = managedLedgerInternalStats;
>         managedLedgerInternalStats.cursors.forEach((s, cursorStats) -> {
> -            assertEquals(s, MLPendingAckStore.PENDING_ACK_STORE_CURSOR_NAME);
> +            assertEquals(s, SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME);
>             assertEquals(cursorStats.readPosition, finalManagedLedgerInternalStats.lastConfirmedEntry);
>         });
> 
> @@ -503,7 +504,7 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
> 
>         assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default",
>                 "testGetPendingAckInternalStats" + "-"
> -                        + subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
> +                        + subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
>                 stats.pendingAckLogStats.managedLedgerName);
>         assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
>     }
> @@ -528,8 +529,12 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
>     }
> 
>     private void initTransaction(int coordinatorSize) throws Exception {
> -        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), coordinatorSize);
> -        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +        pulsar.getPulsarResources()
> +                .getNamespaceResources()
> +                .getPartitionedTopicResources()
> +                .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
> +                        new PartitionedTopicMetadata(coordinatorSize));
> +        admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>         pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
>         pulsarClient.close();
>         Awaitility.await().until(() ->
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
> index 4e54d1b3326..7b96aed47a1 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
> @@ -24,6 +24,7 @@ import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
> import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
> import org.apache.pulsar.client.admin.PulsarAdminException;
> import org.apache.pulsar.client.api.PulsarClientException;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.ClusterData;
> import org.testng.annotations.AfterClass;
> @@ -55,7 +56,7 @@ public class ResourceUsageTransportManagerTest extends MockedPulsarServiceBaseTe
> 
>     @Test
>     public void testNamespaceCreation() throws Exception {
> -        TopicName topicName = TopicName.get(ResourceUsageTopicTransportManager.RESOURCE_USAGE_TOPIC_NAME);
> +        TopicName topicName = SystemTopicNames.RESOURCE_USAGE_TOPIC;
> 
>         assertTrue(admin.tenants().getTenants().contains(topicName.getTenant()));
>         assertTrue(admin.namespaces().getNamespaces(topicName.getTenant()).contains(topicName.getNamespace()));
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
> index 1b3ca1616fa..936e3e0d263 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
> @@ -25,7 +25,7 @@ import static org.testng.Assert.fail;
> import org.apache.pulsar.client.admin.PulsarAdminException;
> import org.apache.pulsar.client.api.MessageId;
> import org.apache.pulsar.client.api.PulsarClientException;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
> import org.apache.pulsar.common.policies.data.TopicType;
> @@ -395,7 +395,7 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
>         pulsar.getConfiguration().setAllowAutoTopicCreation(false);
>         pulsar.getConfiguration().setSystemTopicEnabled(true);
> 
> -        final String topicString = "persistent://prop/ns-abc/" + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
> +        final String topicString = "persistent://prop/ns-abc/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
> 
>         pulsarClient.newProducer().topic(topicString).create();
> 
> @@ -408,7 +408,7 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
>         pulsar.getConfiguration().setAllowAutoTopicCreation(false);
>         pulsar.getConfiguration().setSystemTopicEnabled(true);
> 
> -        final String topicString = "persistent://prop/ns-abc/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
> +        final String topicString = "persistent://prop/ns-abc/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
> 
>         pulsarClient.newProducer().topic(topicString).create();
> 
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
> index b4702fe0100..8731e782348 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
> @@ -18,8 +18,8 @@
>  */
> package org.apache.pulsar.broker.service;
> 
> -import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
> -import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
> import static org.mockito.Mockito.any;
> import static org.mockito.Mockito.doReturn;
> import static org.mockito.Mockito.spy;
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
> index 69db17f4693..6412d7f32a9 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
> @@ -20,8 +20,11 @@ package org.apache.pulsar.broker.service;
> 
> import org.apache.pulsar.broker.ServiceConfiguration;
> import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> +import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> import org.apache.pulsar.common.policies.data.ClusterData;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> +import org.apache.pulsar.metadata.api.MetadataStoreException;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> @@ -50,6 +53,18 @@ public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest {
>         admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc", Sets.newHashSet("test"));
>     }
> 
> +    protected void createTransactionCoordinatorAssign() throws MetadataStoreException {
> +        createTransactionCoordinatorAssign(1);
> +    }
> +
> +    protected void createTransactionCoordinatorAssign(int partitions) throws MetadataStoreException {
> +        pulsar.getPulsarResources()
> +                .getNamespaceResources()
> +                .getPartitionedTopicResources()
> +                .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
> +                        new PartitionedTopicMetadata(partitions));
> +    }
> +
>     void rolloverPerIntervalStats() {
>         try {
>             pulsar.getExecutor().submit(() -> pulsar.getBrokerService().updateRates()).get();
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
> index c31373c05fc..c8c257992cf 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
> @@ -82,9 +82,10 @@ import org.apache.pulsar.client.impl.ProducerImpl;
> import org.apache.pulsar.client.impl.PulsarClientImpl;
> import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
> import org.apache.pulsar.client.impl.transaction.TransactionImpl;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> import org.apache.pulsar.common.policies.data.BacklogQuota;
> import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
> import org.apache.pulsar.common.policies.data.ClusterData;
> @@ -948,23 +949,23 @@ public class ReplicatorTest extends ReplicatorTestBase {
>         admin2.topics().updatePartitionedTopic(persistentTopicName, 5);
>         assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
>         assertEquals((int) admin2.topics().getList(namespace).stream().filter(topic ->
> -                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 5);
> +                !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 5);
>         // Update partitioned topic from R3
>         admin3.topics().updatePartitionedTopic(persistentTopicName, 6);
>         assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
>         assertEquals(admin3.topics().getList(namespace).stream().filter(topic ->
> -                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 6);
> +                !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 6);
>         // Update partitioned topic from R1
>         admin1.topics().updatePartitionedTopic(persistentTopicName, 7);
>         assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 7);
>         assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 7);
>         assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 7);
>         assertEquals(admin1.topics().getList(namespace).stream().filter(topic ->
> -                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
> +                !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
>         assertEquals(admin2.topics().getList(namespace).stream().filter(topic ->
> -                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
> +                !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
>         assertEquals(admin3.topics().getList(namespace).stream().filter(topic ->
> -                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
> +                !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
>     }
> 
>     /**
> @@ -1299,7 +1300,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
>         String topic = TopicName.get("persistent", NamespaceName.get(namespace),
>                 "testDoesNotReplicateSystemTopic").toString();
>         String systemTopic = TopicName.get("persistent", NamespaceName.get(namespace),
> -                EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT).toString();
> +                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT).toString();
>         admin1.topics().createNonPartitionedTopic(topic);
>         // Replicator will not replicate System Topic other than topic policies
>         initTransaction(2, admin1, pulsar1.getBrokerServiceUrl(), pulsar1);
> @@ -1326,8 +1327,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
>     private void initTransaction(int coordinatorSize, PulsarAdmin admin, String ServiceUrl,
>                                  PulsarService pulsarService) throws Exception {
>         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), coordinatorSize);
> -        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), coordinatorSize);
> -        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +        pulsarService.getPulsarResources()
> +                .getNamespaceResources()
> +                .getPartitionedTopicResources()
> +                .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
> +                        new PartitionedTopicMetadata(coordinatorSize));
> +        admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(ServiceUrl).enableTransaction(true).build();
>         pulsarClient.close();
>         Awaitility.await().until(() ->
> @@ -1361,7 +1366,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
>         Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
>             list.clear();
>             list.addAll(admin.topics().getList(namespace).stream()
> -                    .filter(topic -> !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME))
> +                    .filter(topic -> !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME))
>                     .collect(Collectors.toList()));
>             return list.size() == expectedTopicList.size();
>         });
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
> index e50a39c5dfd..cd073737cec 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
> @@ -29,7 +29,6 @@ import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.ConcurrentSkipListMap;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.TimeUnit;
> -
> import com.google.common.collect.Sets;
> import java.util.concurrent.TimeoutException;
> import org.apache.bookkeeper.mledger.Position;
> @@ -39,7 +38,7 @@ import org.apache.pulsar.broker.TransactionMetadataStoreService;
> import org.apache.pulsar.client.api.transaction.TxnID;
> import org.apache.pulsar.common.api.proto.TxnAction;
> import org.apache.pulsar.common.naming.NamespaceName;
> -import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
> import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
> @@ -66,8 +65,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
>         super.baseSetup(configuration);
>         admin.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
>         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
> -        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
> -        admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +        createTransactionCoordinatorAssign(16);
> +        admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>     }
> 
>     @AfterMethod(alwaysRun = true)
> @@ -81,7 +80,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
>         TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
>         Assert.assertNotNull(transactionMetadataStoreService);
> 
> -        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString());
> +        admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString());
>         transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
>         Awaitility.await().until(() ->
>                 transactionMetadataStoreService.getStores().size() == 1);
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
> index 98ac2618eea..6da6026b34f 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
> @@ -21,7 +21,6 @@ package org.apache.pulsar.broker.stats;
> import java.util.List;
> import java.util.Map.Entry;
> import java.util.concurrent.TimeUnit;
> -
> import com.google.common.collect.Sets;
> import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
> import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
> @@ -31,7 +30,6 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
> import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
> import org.apache.pulsar.client.api.Producer;
> import org.apache.pulsar.common.naming.NamespaceName;
> -import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> import org.apache.pulsar.common.stats.Metrics;
> import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
> @@ -99,7 +97,7 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase {
>         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
>                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
>         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
> -        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
> +        createTransactionCoordinatorAssign();
>         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
>         managedLedgerConfig.setMaxEntriesPerLedger(2);
>         new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
> index 0c9f877150a..668c9254b49 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
> @@ -40,7 +40,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
> import org.apache.pulsar.client.impl.transaction.TransactionImpl;
> import org.apache.pulsar.common.api.proto.TxnAction;
> import org.apache.pulsar.common.naming.NamespaceName;
> -import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.policies.data.TenantInfo;
> import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
> import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
> @@ -78,7 +78,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
>                         .allowedClusters(Sets.newHashSet("test"))
>                         .build());
>         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
> -        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
> +        createTransactionCoordinatorAssign();
>     }
> 
>     @AfterMethod(alwaysRun = true)
> @@ -90,7 +90,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
>     @Test
>     public void testTransactionCoordinatorMetrics() throws Exception{
>         long timeout = 10000;
> -        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +        admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>         TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
>         TransactionCoordinatorID transactionCoordinatorIDTwo = TransactionCoordinatorID.get(1);
>         pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
> @@ -127,7 +127,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
>         String topic = "persistent://" + ns1 + "/test_coordinator_metrics";
>         String subName = "test_coordinator_metrics";
>         TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
> -        admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +        admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>         pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
>         admin.topics().createNonPartitionedTopic(topic);
>         admin.topics().createSubscription(topic, subName, MessageId.earliest);
> @@ -229,7 +229,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
>         String topic = "persistent://" + ns1 + "/test_managed_ledger_metrics";
>         String subName = "test_managed_ledger_metrics";
>         admin.topics().createNonPartitionedTopic(topic);
> -        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +        admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>         TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
>         pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
>         admin.topics().createSubscription(topic, subName, MessageId.earliest);
> @@ -291,7 +291,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
>         String subName = "test_managed_ledger_metrics";
>         String subName2 = "test_pending_ack_no_init";
>         admin.topics().createNonPartitionedTopic(topic);
> -        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +        admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>         TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
>         pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
>         admin.topics().createSubscription(topic, subName, MessageId.earliest);
> @@ -352,7 +352,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
> 
>     @Test
>     public void testDuplicateMetricTypeDefinitions() throws Exception{
> -        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +        admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
>         TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
>         TransactionCoordinatorID transactionCoordinatorIDTwo = TransactionCoordinatorID.get(1);
>         pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
> index b680d74b86b..244d7587a42 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
> @@ -30,10 +30,10 @@ import org.apache.pulsar.client.api.Reader;
> import org.apache.pulsar.client.api.Schema;
> import org.apache.pulsar.common.events.ActionType;
> import org.apache.pulsar.common.events.EventType;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> import org.apache.pulsar.common.events.PulsarEvent;
> import org.apache.pulsar.common.events.TopicPoliciesEvent;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.ClusterData;
> import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
> @@ -138,7 +138,7 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
> 
>     @Test(timeOut = 30000)
>     public void checkSystemTopic() throws PulsarAdminException {
> -        final String systemTopic = "persistent://" + NAMESPACE1 + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
> +        final String systemTopic = "persistent://" + NAMESPACE1 + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
>         final String normalTopic = "persistent://" + NAMESPACE1 + "/normal_topic";
>         admin.topics().createPartitionedTopic(normalTopic, 3);
>         TopicName systemTopicName = TopicName.get(systemTopic);
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
> index d506f712767..8923d01d92b 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
> @@ -33,8 +33,8 @@ import org.apache.pulsar.client.api.Message;
> import org.apache.pulsar.client.api.Producer;
> import org.apache.pulsar.client.api.SubscriptionInitialPosition;
> import org.apache.pulsar.client.api.SubscriptionType;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.policies.data.TenantInfo;
> import org.apache.pulsar.client.api.MessageId;
> import org.apache.pulsar.common.naming.TopicName;
> @@ -88,7 +88,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
>         SystemTopicClient.Reader reader = systemTopicClientForNamespace.newReader();
> 
>         int partitions = admin.topics().getPartitionedTopicMetadata(
> -                String.format("persistent://%s/%s", ns, EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions;
> +                String.format("persistent://%s/%s", ns, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions;
>         Assert.assertEquals(admin.topics().getPartitionedTopicList(ns).size(), 1);
>         Assert.assertEquals(partitions, PARTITIONS);
>         Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS);
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
> index 78259c23b8a..3bc31bbc4fc 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
> @@ -18,7 +18,7 @@
>  */
> package org.apache.pulsar.broker.transaction;
> 
> -import static org.apache.pulsar.common.events.EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
> import static org.mockito.ArgumentMatchers.any;
> import static org.mockito.Mockito.doReturn;
> import static org.mockito.Mockito.doThrow;
> @@ -67,7 +67,6 @@ import org.apache.pulsar.client.api.transaction.TxnID;
> import org.apache.pulsar.client.impl.MessageIdImpl;
> import org.apache.pulsar.client.impl.transaction.TransactionImpl;
> import org.apache.pulsar.common.events.EventType;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.TopicStats;
> import org.apache.pulsar.common.util.FutureUtil;
> @@ -252,7 +251,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
>         ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
>                 .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
>                 .startMessageId(MessageId.earliest)
> -                .topic(NAMESPACE1 + "/" + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
> +                .topic(NAMESPACE1 + "/" + TRANSACTION_BUFFER_SNAPSHOT);
>         Reader<TransactionBufferSnapshot> reader = readerBuilder.create();
> 
>         MessageId messageId1 = producer.newMessage(tnx1).value("test").send();
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
> index 24e5c5aaf1e..32efdb51659 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
> @@ -19,7 +19,6 @@
> package org.apache.pulsar.broker.transaction;
> 
> import static java.nio.charset.StandardCharsets.UTF_8;
> -
> import com.google.common.collect.Sets;
> import io.netty.buffer.ByteBuf;
> import io.netty.buffer.Unpooled;
> @@ -45,7 +44,6 @@ import org.apache.pulsar.common.api.proto.MessageIdData;
> import org.apache.pulsar.common.api.proto.MessageMetadata;
> import org.apache.pulsar.common.api.proto.TxnAction;
> import org.apache.pulsar.common.naming.NamespaceName;
> -import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.ClusterData;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> import org.apache.pulsar.common.protocol.Commands;
> @@ -82,7 +80,7 @@ public class TransactionConsumeTest extends TransactionTestBase {
>         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
>                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
>         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
> -        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
> +        createTransactionCoordinatorAssign(1);
>     }
> 
>     @AfterMethod(alwaysRun = true)
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
> index 61d69661d62..60ecf04e602 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
> @@ -19,7 +19,7 @@
> package org.apache.pulsar.broker.transaction;
> 
> import static java.nio.charset.StandardCharsets.UTF_8;
> -import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
> +import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
> import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
> import static org.mockito.ArgumentMatchers.any;
> import static org.mockito.ArgumentMatchers.anyInt;
> @@ -32,7 +32,6 @@ import static org.testng.Assert.assertEquals;
> import static org.testng.Assert.assertFalse;
> import static org.testng.Assert.assertTrue;
> import static org.testng.Assert.fail;
> -
> import io.netty.buffer.Unpooled;
> import io.netty.util.Timeout;
> import java.lang.reflect.Field;
> @@ -93,8 +92,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
> import org.apache.pulsar.common.api.proto.CommandSubscribe;
> import org.apache.pulsar.client.impl.transaction.TransactionImpl;
> import org.apache.pulsar.common.events.EventType;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicDomain;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
> @@ -230,7 +229,7 @@ public class TransactionTest extends TransactionTestBase {
> 
>         Assert.assertNull(topics.get(TopicName.get(TopicDomain.persistent.value(),
>                 NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString() + 0));
> -        Assert.assertNull(topics.get(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
> +        Assert.assertNull(topics.get(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
>         Assert.assertNull(topics.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName)));
>     }
> 
> @@ -347,7 +346,7 @@ public class TransactionTest extends TransactionTestBase {
>         ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
>                 .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
>                 .startMessageId(MessageId.earliest)
> -                .topic(NAMESPACE1 + "/" + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
> +                .topic(NAMESPACE1 + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
>         Reader<TransactionBufferSnapshot> reader = readerBuilder.create();
> 
>         long waitSnapShotTime = getPulsarServiceList().get(0).getConfiguration()
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
> index bdcde1be814..dc034daa061 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
> @@ -50,9 +50,11 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
> import org.apache.pulsar.client.admin.PulsarAdmin;
> import org.apache.pulsar.client.api.PulsarClient;
> import org.apache.pulsar.common.naming.NamespaceName;
> -import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> +import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> import org.apache.pulsar.common.policies.data.ClusterData;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> +import org.apache.pulsar.metadata.api.MetadataStoreException;
> import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
> import org.apache.pulsar.metadata.impl.ZKMetadataStore;
> import org.apache.pulsar.tests.TestRetrySupport;
> @@ -123,7 +125,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {
>         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
>                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
>         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
> -        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), numPartitionsOfTC);
> +        createTransactionCoordinatorAssign(numPartitionsOfTC);
>         if (topic != null) {
>             admin.tenants().createTenant(TENANT,
>                     new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
> @@ -144,6 +146,14 @@ public abstract class TransactionTestBase extends TestRetrySupport {
>                 .build();
>     }
> 
> +    protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) throws MetadataStoreException {
> +        pulsarServiceList.get(0).getPulsarResources()
> +                .getNamespaceResources()
> +                .getPartitionedTopicResources()
> +                .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
> +                        new PartitionedTopicMetadata(numPartitionsOfTC));
> +    }
> +
>     protected void startBroker() throws Exception {
>         for (int i = 0; i < brokerCount; i++) {
>             conf.setClusterName(CLUSTER_NAME);
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
> index 43d31e7f9ff..77ac464c1d4 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java
> @@ -26,8 +26,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
> import org.apache.pulsar.client.api.PulsarClientException;
> import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
> import org.apache.pulsar.client.impl.PulsarClientImpl;
> -import org.apache.pulsar.common.events.EventsTopicNames;
> import org.apache.pulsar.common.naming.NamespaceName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicDomain;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.PublisherStats;
> @@ -38,7 +38,6 @@ import org.testng.annotations.AfterMethod;
> import org.testng.annotations.BeforeMethod;
> import org.testng.annotations.DataProvider;
> import org.testng.annotations.Test;
> -
> import java.util.List;
> import java.util.concurrent.TimeUnit;
> 
> @@ -110,7 +109,7 @@ public class TransactionBufferCloseTest extends TransactionTestBase {
> 
>     private void checkSnapshotPublisherCount(String namespace, int expectCount) throws PulsarAdminException {
>         TopicName snTopicName = TopicName.get(TopicDomain.persistent.value(), NamespaceName.get(namespace),
> -                EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
> +                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
>         List<PublisherStats> publisherStatsList =
>                 (List<PublisherStats>) admin.topics()
>                         .getStats(snTopicName.getPartitionedTopicName()).getPublishers();
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
> index 873509ff6bf..192353a2da0 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
> @@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import lombok.Cleanup;
> import lombok.extern.slf4j.Slf4j;
> -
> import org.apache.bookkeeper.mledger.impl.PositionImpl;
> import org.apache.commons.collections4.map.LinkedMap;
> import org.apache.pulsar.broker.service.BrokerService;
> @@ -49,9 +48,8 @@ import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
> import org.apache.pulsar.client.api.transaction.TxnID;
> import org.apache.pulsar.client.impl.PulsarClientImpl;
> import org.apache.pulsar.client.impl.transaction.TransactionImpl;
> -import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> -
> import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
> import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
> import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
> @@ -144,7 +142,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
> 
>         PartitionedTopicMetadata partitionedTopicMetadata =
>                 ((PulsarClientImpl) pulsarClient).getLookup()
> -                        .getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
> +                        .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
>         Transaction lowWaterMarkTxn = null;
>         for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
>             lowWaterMarkTxn = pulsarClient.newTransaction()
> @@ -249,7 +247,7 @@ public class TransactionLowWaterMarkTest extends TransactionTestBase {
> 
>         PartitionedTopicMetadata partitionedTopicMetadata =
>                 ((PulsarClientImpl) pulsarClient).getLookup()
> -                        .getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
> +                        .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
>         Transaction lowWaterMarkTxn = null;
>         for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
>             lowWaterMarkTxn = pulsarClient.newTransaction()
> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
> index 01027868e27..c95becc2b3d 100644
> --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
> +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
> @@ -24,7 +24,7 @@ import org.apache.pulsar.broker.PulsarService;
> import org.apache.pulsar.broker.transaction.TransactionTestBase;
> import org.apache.pulsar.client.api.PulsarClient;
> import org.apache.pulsar.client.api.ServiceUrlProvider;
> -import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
> import org.awaitility.Awaitility;
> import org.testng.Assert;
> @@ -81,7 +81,7 @@ public class TransactionMetaStoreAssignmentTest extends TransactionTestBase {
>         // close pulsar client will not init tc again
>         pulsarClient.close();
> 
> -        admin.topics().unload(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +        admin.topics().unload(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
> 
>         for (int i = 0; i < 16; i++) {
>             final int f = i;
> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
> index 432fc671071..54a5e07405e 100644
> --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
> +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
> @@ -36,6 +36,7 @@ import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
> import org.apache.pulsar.client.util.MathUtils;
> import org.apache.pulsar.common.api.proto.Subscription;
> import org.apache.pulsar.common.api.proto.TxnAction;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.util.FutureUtil;
> import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
> @@ -78,7 +79,7 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC
>     @Override
>     public CompletableFuture<Void> startAsync() {
>         if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
> -            return pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN)
> +            return pulsarClient.getLookup().getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN)
>                 .thenCompose(partitionMeta -> {
>                     List<CompletableFuture<Void>> connectFutureList = new ArrayList<>();
>                     if (LOG.isDebugEnabled()) {
> @@ -116,9 +117,10 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC
> 
>     private String getTCAssignTopicName(int partition) {
>         if (partition >= 0) {
> -            return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
> +            return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()
> +                    + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
>         } else {
> -            return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString();
> +            return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString();
>         }
>     }
> 
> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
> similarity index 52%
> rename from pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
> rename to pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
> index 7ed6308ec04..eaab8261460 100644
> --- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
> +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
> @@ -16,17 +16,16 @@
>  * specific language governing permissions and limitations
>  * under the License.
>  */
> -package org.apache.pulsar.common.events;
> +package org.apache.pulsar.common.naming;
> 
> import com.google.common.collect.Sets;
> import java.util.Collections;
> import java.util.Set;
> -import org.apache.pulsar.common.naming.TopicName;
> 
> /**
> - * System topic names for each {@link EventType}.
> + * Encapsulate the parsing of the completeTopicName name.
>  */
> -public class EventsTopicNames {
> +public class SystemTopicNames {
> 
>     /**
>      * Local topic name for the namespace events.
> @@ -38,19 +37,34 @@ public class EventsTopicNames {
>      */
>     public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
> 
> +
> +    public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
> +
> +    public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state";
> +
>     /**
>      * The set of all local topic names declared above.
>      */
>     public static final Set<String> EVENTS_TOPIC_NAMES =
>             Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT));
> 
> -    public static boolean checkTopicIsEventsNames(TopicName topicName) {
> +
> +    public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
> +            NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
> +
> +    public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
> +            NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
> +
> +    public static final TopicName RESOURCE_USAGE_TOPIC = TopicName.get(TopicDomain.non_persistent.value(),
> +            NamespaceName.SYSTEM_NAMESPACE, "resource-usage");
> +
> +    public static boolean isEventSystemTopic(TopicName topicName) {
>         return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
>     }
> 
> -    public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicName) {
> +    public static boolean isTransactionCoordinatorAssign(TopicName topicName) {
>         return topicName != null && topicName.toString()
> -                .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
> +                .startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString());
>     }
> 
>     public static boolean isTopicPoliciesSystemTopic(String topic) {
> @@ -59,4 +73,16 @@ public class EventsTopicNames {
>         }
>         return TopicName.get(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
>     }
> +
> +    public static boolean isTransactionInternalName(TopicName topicName) {
> +        String topic = topicName.toString();
> +        return topic.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString())
> +                || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
> +                || topic.endsWith(PENDING_ACK_STORE_SUFFIX);
> +    }
> +
> +    public static boolean isSystemTopic(TopicName topicName) {
> +        TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
> +        return isEventSystemTopic(nonePartitionedTopicName) || isTransactionInternalName(nonePartitionedTopicName);
> +    }
> }
> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
> index 97f99b19f19..5f03c60ece9 100644
> --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
> +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
> @@ -60,12 +60,6 @@ public class TopicName implements ServiceUnitId {
>                 }
>             });
> 
> -    public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
> -            NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
> -
> -    public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
> -            NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
> -
>     public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
>         String name = domain + "://" + namespaceName.toString() + '/' + topic;
>         return TopicName.get(name);
> diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
> index e4cc0bf23e6..1cf0d7da354 100644
> --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
> +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
> @@ -44,7 +44,8 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
> import org.apache.pulsar.client.api.SubscriptionType;
> import org.apache.pulsar.client.api.TokenOauth2AuthenticatedProducerConsumerTest;
> import org.apache.pulsar.common.naming.NamespaceName;
> -import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> +import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> import org.apache.pulsar.common.policies.data.ClusterData;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> import org.apache.pulsar.common.util.ObjectMapperFactory;
> @@ -146,7 +147,11 @@ public class Oauth2PerformanceTransactionTest extends ProducerConsumerBase {
>                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
>         admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
>         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
> -        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
> +        pulsar.getPulsarResources()
> +                .getNamespaceResources()
> +                .getPartitionedTopicResources()
> +                .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
> +                        new PartitionedTopicMetadata(1));
> 
>         replacePulsarClient(PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
>                 .statsInterval(0, TimeUnit.SECONDS)
> diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
> index 936f70bedf3..4ef3bba46c8 100644
> --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
> +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
> @@ -33,14 +33,14 @@ import org.apache.pulsar.client.api.Schema;
> import org.apache.pulsar.client.api.SubscriptionInitialPosition;
> import org.apache.pulsar.client.api.SubscriptionType;
> import org.apache.pulsar.common.naming.NamespaceName;
> -import org.apache.pulsar.common.naming.TopicName;
> +import org.apache.pulsar.common.naming.SystemTopicNames;
> +import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
> import org.apache.pulsar.common.policies.data.ClusterData;
> import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> import org.testng.Assert;
> import org.testng.annotations.AfterMethod;
> import org.testng.annotations.BeforeMethod;
> import org.testng.annotations.Test;
> -
> import java.util.UUID;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicInteger;
> @@ -74,7 +74,9 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
>                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
>         admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
>         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
> -        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
> +        pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
> +                .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
> +                        new PartitionedTopicMetadata(1));
>     }
> 
>     @AfterMethod(alwaysRun = true)
>