You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/02 17:16:49 UTC

[pulsar] branch master updated: Add base class for PersistentTopic and NonPersistentTopic (#4438)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f2dfc1  Add base class for PersistentTopic and NonPersistentTopic (#4438)
5f2dfc1 is described below

commit 5f2dfc1fd73a937f505a25a74fa9edd6a5a0e324
Author: Like <ke...@outlook.com>
AuthorDate: Mon Jun 3 01:16:43 2019 +0800

    Add base class for PersistentTopic and NonPersistentTopic (#4438)
    
    * Add base class for PersistentTopic and NonPersistentTopic
    
    * Fix test
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  10 +-
 .../pulsar/broker/service/AbstractTopic.java       | 203 +++++++++++++++++++++
 .../nonpersistent/NonPersistentReplicator.java     |   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  | 161 +---------------
 .../PersistentDispatcherSingleActiveConsumer.java  |   2 +-
 .../service/persistent/PersistentReplicator.java   |   2 +-
 .../broker/service/persistent/PersistentTopic.java | 176 ++----------------
 .../broker/admin/AdminApiSchemaAutoUpdateTest.java |   6 +-
 .../pulsar/broker/service/PersistentTopicTest.java |   2 +-
 9 files changed, 232 insertions(+), 332 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2d2b7bd..b7302f7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -769,7 +769,7 @@ public class PersistentTopicsBase extends AdminResource {
             validateAdminAccessForSubscriber(subName, authoritative);
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
             try {
-                if (subName.startsWith(topic.replicatorPrefix)) {
+                if (subName.startsWith(topic.getReplicatorPrefix())) {
                     String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                     PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
                     checkNotNull(repl);
@@ -801,7 +801,7 @@ public class PersistentTopicsBase extends AdminResource {
         validateAdminAccessForSubscriber(subName, authoritative);
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         try {
-            if (subName.startsWith(topic.replicatorPrefix)) {
+            if (subName.startsWith(topic.getReplicatorPrefix())) {
                 String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                 PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
                 checkNotNull(repl);
@@ -1050,13 +1050,13 @@ public class PersistentTopicsBase extends AdminResource {
         PersistentReplicator repl = null;
         PersistentSubscription sub = null;
         Entry entry = null;
-        if (subName.startsWith(topic.replicatorPrefix)) {
+        if (subName.startsWith(topic.getReplicatorPrefix())) {
             repl = getReplicatorReference(subName, topic);
         } else {
             sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
         }
         try {
-            if (subName.startsWith(topic.replicatorPrefix)) {
+            if (subName.startsWith(topic.getReplicatorPrefix())) {
                 entry = repl.peekNthMessage(messagePosition).get();
             } else {
                 entry = sub.peekNthMessage(messagePosition).get();
@@ -1200,7 +1200,7 @@ public class PersistentTopicsBase extends AdminResource {
             }
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
             try {
-                if (subName.startsWith(topic.replicatorPrefix)) {
+                if (subName.startsWith(topic.getReplicatorPrefix())) {
                     String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                     PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
                     checkNotNull(repl);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
new file mode 100644
index 0000000..31b27f7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.base.MoreObjects;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+
+public abstract class AbstractTopic implements Topic {
+    private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
+
+    protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
+
+    protected final String topic;
+
+    // Producers currently connected to this topic
+    protected final ConcurrentOpenHashSet<Producer> producers;
+
+    protected final BrokerService brokerService;
+
+    // Prefix for replication cursors
+    protected final String replicatorPrefix;
+
+    protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    protected volatile boolean isFenced;
+
+    // Timestamp of when this topic was last seen active
+    protected volatile long lastActive;
+
+    // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
+    // doesn't support batch-message
+    protected volatile boolean hasBatchMessagePublished = false;
+
+    protected StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
+
+    // Whether messages published must be encrypted or not in this topic
+    protected volatile boolean isEncryptionRequired = false;
+    protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+            SchemaCompatibilityStrategy.FULL;
+    // schema validation enforced flag
+    protected volatile boolean schemaValidationEnforced = false;
+
+    public AbstractTopic(String topic, BrokerService brokerService) {
+        this.topic = topic;
+        this.brokerService = brokerService;
+        this.producers = new ConcurrentOpenHashSet<>(16, 1);
+        this.isFenced = false;
+        this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
+        this.lastActive = System.nanoTime();
+    }
+
+    protected boolean isProducersExceeded() {
+        Policies policies;
+        try {
+            policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
+                    .orElseGet(() -> new Policies());
+        } catch (Exception e) {
+            policies = new Policies();
+        }
+        final int maxProducers = policies.max_producers_per_topic > 0 ?
+                policies.max_producers_per_topic :
+                brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        if (maxProducers > 0 && maxProducers <= producers.size()) {
+            return true;
+        }
+        return false;
+    }
+
+    protected boolean hasLocalProducers() {
+        AtomicBoolean foundLocal = new AtomicBoolean(false);
+        producers.forEach(producer -> {
+            if (!producer.isRemote()) {
+                foundLocal.set(true);
+            }
+        });
+
+        return foundLocal.get();
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("topic", topic).toString();
+    }
+
+    @Override
+    public ConcurrentOpenHashSet<Producer> getProducers() {
+        return producers;
+    }
+
+
+    public BrokerService getBrokerService() {
+        return brokerService;
+    }
+
+    @Override
+    public String getName() {
+        return topic;
+    }
+
+    @Override
+    public boolean isEncryptionRequired() {
+        return isEncryptionRequired;
+    }
+
+    @Override
+    public boolean getSchemaValidationEnforced() {
+        return schemaValidationEnforced;
+    }
+
+    public void markBatchMessagePublished() {
+        this.hasBatchMessagePublished = true;
+    }
+
+    public String getReplicatorPrefix() {
+        return replicatorPrefix;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> hasSchema() {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+                .getSchemaRegistryService()
+                .getSchema(id).thenApply(Objects::nonNull);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        if (schema == null) {
+            return CompletableFuture.completedFuture(SchemaVersion.Empty);
+        }
+
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+                .getSchemaRegistryService()
+                .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> deleteSchema() {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
+        return schemaRegistryService.getSchema(id)
+                .thenCompose(schema -> {
+                    if (schema != null) {
+                        return schemaRegistryService.deleteSchema(id, "");
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+                .getSchemaRegistryService()
+                .isCompatible(id, schema, schemaCompatibilityStrategy);
+    }
+
+    @Override
+    public void recordAddLatency(long latencyUSec) {
+        addEntryLatencyStatsUsec.addValue(latencyUSec);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 10e16ff..b6ea53a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -51,7 +51,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
 
     public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
             BrokerService brokerService) throws NamingException {
-        super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
+        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
 
         producerBuilder.blockIfQueueFull(false);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3499afc..c8779d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -20,11 +20,9 @@ package org.apache.pulsar.broker.service.nonpersistent;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import com.carrotsearch.hppc.ObjectObjectHashMap;
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -39,13 +37,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -65,7 +61,6 @@ import org.apache.pulsar.broker.service.StreamingStats;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -83,54 +78,29 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.Curso
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.schema.SchemaData;
-import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NonPersistentTopic implements Topic {
-    private final String topic;
-
-    // Producers currently connected to this topic
-    private final ConcurrentOpenHashSet<Producer> producers;
+public class NonPersistentTopic extends AbstractTopic implements Topic {
 
     // Subscriptions to this topic
     private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
 
     private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
 
-    private final BrokerService brokerService;
-
-    private volatile boolean isFenced;
-
-    // Prefix for replication cursors
-    public final String replicatorPrefix;
-
     protected static final AtomicLongFieldUpdater<NonPersistentTopic> USAGE_COUNT_UPDATER = AtomicLongFieldUpdater
             .newUpdater(NonPersistentTopic.class, "usageCount");
     private volatile long usageCount = 0;
 
-    private final OrderedExecutor executor;
-
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
-    // Timestamp of when this topic was last seen active
-    private volatile long lastActive;
-
-    // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
-    // doesn't support batch-message
-    private volatile boolean hasBatchMessagePublished = false;
     // Ever increasing counter of entries added
-    static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater
+    private static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater
             .newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
     private volatile long entriesAddedCounter = 0;
-    private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
-    private StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
 
     private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
         @Override
@@ -139,13 +109,6 @@ public class NonPersistentTopic implements Topic {
         }
     };
 
-    // Whether messages published must be encrypted or not in this topic
-    private volatile boolean isEncryptionRequired = false;
-    private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
-        SchemaCompatibilityStrategy.FULL;
-    // schema validation enforced flag
-    private volatile boolean schemaValidationEnforced = false;
-
     private static class TopicStats {
         public double averageMsgSize;
         public double aggMsgRateIn;
@@ -170,18 +133,12 @@ public class NonPersistentTopic implements Topic {
     }
 
     public NonPersistentTopic(String topic, BrokerService brokerService) {
-        this.topic = topic;
-        this.brokerService = brokerService;
-        this.producers = new ConcurrentOpenHashSet<Producer>(16, 1);
+        super(topic, brokerService);
         this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
         this.replicators = new ConcurrentOpenHashMap<>(16, 1);
         this.isFenced = false;
-        this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
-        this.executor = brokerService.getTopicOrderedExecutor();
         USAGE_COUNT_UPDATER.set(this, 0);
 
-        this.lastActive = System.nanoTime();
-
         try {
             Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
                     .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
@@ -265,40 +222,11 @@ public class NonPersistentTopic implements Topic {
         }
     }
 
-    private boolean isProducersExceeded() {
-        Policies policies;
-        try {
-            policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
-                    .orElseGet(() -> new Policies());
-        } catch (Exception e) {
-            policies = new Policies();
-        }
-        final int maxProducers = policies.max_producers_per_topic > 0 ?
-                policies.max_producers_per_topic :
-                brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
-        if (maxProducers > 0 && maxProducers <= producers.size()) {
-            return true;
-        }
-        return false;
-    }
-
     @Override
     public void checkMessageDeduplicationInfo() {
         // No-op
     }
 
-    private boolean hasLocalProducers() {
-        AtomicBoolean foundLocal = new AtomicBoolean(false);
-        producers.forEach(producer -> {
-            if (!producer.isRemote()) {
-                foundLocal.set(true);
-            }
-        });
-
-        return foundLocal.get();
-    }
-
     @Override
     public void removeProducer(Producer producer) {
         checkArgument(producer.getTopic() == this);
@@ -673,16 +601,6 @@ public class NonPersistentTopic implements Topic {
         // No-op
     }
 
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this).add("topic", topic).toString();
-    }
-
-    @Override
-    public ConcurrentOpenHashSet<Producer> getProducers() {
-        return producers;
-    }
-
     public int getNumberOfConsumers() {
         int count = 0;
         for (NonPersistentSubscription subscription : subscriptions.values()) {
@@ -710,15 +628,6 @@ public class NonPersistentTopic implements Topic {
         return replicators.get(remoteCluster);
     }
 
-    public BrokerService getBrokerService() {
-        return brokerService;
-    }
-
-    @Override
-    public String getName() {
-        return topic;
-    }
-
     public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
             ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
 
@@ -996,14 +905,6 @@ public class NonPersistentTopic implements Topic {
     }
 
     @Override
-    public boolean isEncryptionRequired() {
-        return isEncryptionRequired;
-    }
-
-    @Override
-    public boolean getSchemaValidationEnforced() { return schemaValidationEnforced; }
-
-    @Override
     public boolean isReplicated() {
         return replicators.size() > 1;
     }
@@ -1019,57 +920,8 @@ public class NonPersistentTopic implements Topic {
         throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic");
     }
 
-    public void markBatchMessagePublished() {
-        this.hasBatchMessagePublished = true;
-    }
-
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
 
-    @Override
-    public CompletableFuture<Boolean> hasSchema() {
-        String base = TopicName.get(getName()).getPartitionedTopicName();
-        String id = TopicName.get(base).getSchemaName();
-        return brokerService.pulsar()
-            .getSchemaRegistryService()
-            .getSchema(id).thenApply((schema) -> schema != null);
-    }
-
-    @Override
-    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
-        if (schema == null) {
-            return CompletableFuture.completedFuture(SchemaVersion.Empty);
-        }
-
-        String base = TopicName.get(getName()).getPartitionedTopicName();
-        String id = TopicName.get(base).getSchemaName();
-        return brokerService.pulsar()
-            .getSchemaRegistryService()
-            .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
-    }
-
-    @Override
-    public CompletableFuture<SchemaVersion> deleteSchema() {
-        String base = TopicName.get(getName()).getPartitionedTopicName();
-        String id = TopicName.get(base).getSchemaName();
-        SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
-        return schemaRegistryService.getSchema(id)
-                .thenCompose(schema -> {
-                    if (schema != null) {
-                        return schemaRegistryService.deleteSchema(id, "");
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
-    }
-
-    @Override
-    public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
-        String base = TopicName.get(getName()).getPartitionedTopicName();
-        String id = TopicName.get(base).getSchemaName();
-        return brokerService.pulsar()
-            .getSchemaRegistryService()
-            .isCompatible(id, schema, schemaCompatibilityStrategy);
-    }
 
     @Override
     public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
@@ -1082,9 +934,4 @@ public class NonPersistentTopic implements Topic {
                     }
                 });
     }
-    
-    @Override
-    public void recordAddLatency(long latencyUSec) {
-        addEntryLatencyStatsUsec.addValue(latencyUSec);
-    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index e0b5616..75d2b49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -421,7 +421,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
             }
             havePendingRead = true;
             if (consumer.readCompacted()) {
-                topic.compactedTopic.asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
+                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
             } else {
                 cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 01c547e..dc5752e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -98,7 +98,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
 
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
             BrokerService brokerService) throws NamingException {
-        super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
+        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
         this.topic = topic;
         this.cursor = cursor;
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f60302e..a1ed823 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
@@ -35,7 +34,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -56,9 +54,9 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
@@ -83,7 +81,6 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -107,12 +104,10 @@ import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.schema.SchemaData;
-import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.compaction.Compactor;
@@ -123,7 +118,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.carrotsearch.hppc.ObjectObjectHashMap;
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -131,46 +125,25 @@ import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
 
-public class PersistentTopic implements Topic, AddEntryCallback {
-    private final String topic;
+public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {
 
     // Managed ledger associated with the topic
     private final ManagedLedger ledger;
 
-    // Producers currently connected to this topic
-    private final ConcurrentOpenHashSet<Producer> producers;
-
     // Subscriptions to this topic
     private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
 
     private final ConcurrentOpenHashMap<String, Replicator> replicators;
 
-    private final BrokerService brokerService;
-
-    private volatile boolean isFenced;
-
     protected static final AtomicLongFieldUpdater<PersistentTopic> USAGE_COUNT_UPDATER =
             AtomicLongFieldUpdater.newUpdater(PersistentTopic.class, "usageCount");
     @SuppressWarnings("unused")
     private volatile long usageCount = 0;
 
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
-    // Prefix for replication cursors
-    public final String replicatorPrefix;
-
     static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
 
     private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
 
-    private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
-
-    // Timestamp of when this topic was last seen active
-    private volatile long lastActive;
-
-    // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
-    // doesn't support batch-message
-    private volatile boolean hasBatchMessagePublished = false;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
     private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
@@ -178,21 +151,12 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     private final MessageDeduplication messageDeduplication;
 
     private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
-    CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
-    final CompactedTopic compactedTopic;
+    private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
+    private final CompactedTopic compactedTopic;
 
-    CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
+    private CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
             (MessageIdImpl)MessageId.earliest);
 
-    // Whether messages published must be encrypted or not in this topic
-    private volatile boolean isEncryptionRequired = false;
-    private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
-        SchemaCompatibilityStrategy.FULL;
-    // schema validation enforced flag
-    private volatile boolean schemaValidationEnforced = false;
-    private final StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
-
-
     private volatile Optional<ReplicatedSubscriptionsController> replicatedSubscriptionsController = Optional.empty();
 
     private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
@@ -211,7 +175,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats;
 
         public TopicStatsHelper() {
-            remotePublishersStats = new ObjectObjectHashMap<String, PublisherStats>();
+            remotePublishersStats = new ObjectObjectHashMap<>();
             reset();
         }
 
@@ -226,14 +190,10 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     }
 
     public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws NamingException {
-        this.topic = topic;
+        super(topic, brokerService);
         this.ledger = ledger;
-        this.brokerService = brokerService;
-        this.producers = new ConcurrentOpenHashSet<Producer>(16, 1);
         this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
         this.replicators = new ConcurrentOpenHashMap<>(16, 1);
-        this.isFenced = false;
-        this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
         USAGE_COUNT_UPDATER.set(this, 0);
 
         initializeDispatchRateLimiterIfNeeded(Optional.empty());
@@ -261,8 +221,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                 subscriptions.get(subscriptionName).deactivateCursor();
             }
         }
-        this.lastActive = System.nanoTime();
-
         this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);
 
         try {
@@ -411,35 +369,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         }
     }
 
-    private boolean isProducersExceeded() {
-        Policies policies;
-        try {
-            policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
-                    .orElseGet(() -> new Policies());
-        } catch (Exception e) {
-            policies = new Policies();
-        }
-        final int maxProducers = policies.max_producers_per_topic > 0 ?
-                policies.max_producers_per_topic :
-                brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
-        if (maxProducers > 0 && maxProducers <= producers.size()) {
-            return true;
-        }
-        return false;
-    }
-
-    private boolean hasLocalProducers() {
-        AtomicBoolean foundLocal = new AtomicBoolean(false);
-        producers.forEach(producer -> {
-            if (!producer.isRemote()) {
-                foundLocal.set(true);
-            }
-        });
-
-        return foundLocal.get();
-    }
-
     private boolean hasRemoteProducers() {
         AtomicBoolean foundRemote = new AtomicBoolean(false);
         producers.forEach(producer -> {
@@ -1205,16 +1134,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         return messageDeduplication.isEnabled();
     }
 
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this).add("topic", topic).toString();
-    }
-
-    @Override
-    public ConcurrentOpenHashSet<Producer> getProducers() {
-        return producers;
-    }
-
     public int getNumberOfConsumers() {
         int count = 0;
         for (PersistentSubscription subscription : subscriptions.values()) {
@@ -1232,10 +1151,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         return subscriptions.get(subscriptionName);
     }
 
-    public BrokerService getBrokerService() {
-        return brokerService;
-    }
-
     public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
         return replicators;
     }
@@ -1244,11 +1159,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         return replicators.get(remoteCluster);
     }
 
-    @Override
-    public String getName() {
-        return topic;
-    }
-
     public ManagedLedger getManagedLedger() {
         return ledger;
     }
@@ -1658,7 +1568,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                 // Negative retention time means the topic should be retained indefinitely,
                 // because its own data has to be retained
                 return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
-            }).orElse(false).booleanValue();
+            }).orElse(false);
         } catch (Exception e) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error getting policies", topic);
@@ -1750,14 +1660,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     }
 
     @Override
-    public boolean isEncryptionRequired() {
-        return isEncryptionRequired;
-    }
-
-    @Override
-    public boolean getSchemaValidationEnforced() { return schemaValidationEnforced; }
-
-    @Override
     public boolean isReplicated() {
         return !replicators.isEmpty();
     }
@@ -1851,10 +1753,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
     }
 
-    public void markBatchMessagePublished() {
-        this.hasBatchMessagePublished = true;
-    }
-
     public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
         return this.dispatchRateLimiter;
     }
@@ -1946,52 +1844,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
 
     @Override
-    public CompletableFuture<Boolean> hasSchema() {
-        String base = TopicName.get(getName()).getPartitionedTopicName();
-        String id = TopicName.get(base).getSchemaName();
-        return brokerService.pulsar()
-            .getSchemaRegistryService()
-            .getSchema(id).thenApply((schema) -> schema != null);
-    }
-
-    @Override
-    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
-        if (schema == null) {
-            return CompletableFuture.completedFuture(SchemaVersion.Empty);
-        }
-
-        String base = TopicName.get(getName()).getPartitionedTopicName();
-        String id = TopicName.get(base).getSchemaName();
-        return brokerService.pulsar()
-            .getSchemaRegistryService()
-            .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
-    }
-
-    @Override
-    public CompletableFuture<SchemaVersion> deleteSchema() {
-        String base = TopicName.get(getName()).getPartitionedTopicName();
-        String id = TopicName.get(base).getSchemaName();
-        SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
-        return schemaRegistryService.getSchema(id)
-                .thenCompose(schema -> {
-                    if (schema != null) {
-                        return schemaRegistryService.deleteSchema(id, "");
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
-    }
-
-    @Override
-    public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
-        String base = TopicName.get(getName()).getPartitionedTopicName();
-        String id = TopicName.get(base).getSchemaName();
-        return brokerService.pulsar()
-            .getSchemaRegistryService()
-            .isCompatible(id, schema, schemaCompatibilityStrategy);
-    }
-
-    @Override
     public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
         return hasSchema()
             .thenCompose((hasSchema) -> {
@@ -2003,12 +1855,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
                 });
     }
 
-
-    @Override
-    public void recordAddLatency(long latencyUSec) {
-        addEntryLatencyStatsUsec.addValue(latencyUSec);
-    }
-
     private synchronized void checkReplicatedSubscriptionControllerState() {
         AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
         subscriptions.forEach((name, subscription) -> {
@@ -2047,5 +1893,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         }
 
         ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);;
-     }
+    }
+
+    public CompactedTopic getCompactedTopic() {
+        return compactedTopic;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
index dc7a9ac..5af5c7f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
@@ -184,10 +184,10 @@ public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest {
 
         for (int i = 0; i < 100; i++) {
             Topic t = pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
-            // get around fact that field is private and topic can be persisent or non-persistent
-            Field strategy = t.getClass().getDeclaredField("schemaCompatibilityStrategy");
+            // get around fact that field is private and topic can be persistent or non-persistent
+            Field strategy = t.getClass().getSuperclass().getDeclaredField("schemaCompatibilityStrategy");
             strategy.setAccessible(true);
-            if (((SchemaCompatibilityStrategy)strategy.get(t)) == SchemaCompatibilityStrategy.FULL) {
+            if (strategy.get(t) == SchemaCompatibilityStrategy.FULL) {
                 break;
             }
             Thread.sleep(100);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index b9cef67..556f77b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1189,7 +1189,7 @@ public class PersistentTopicTest {
         doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
 
         PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
-        String remoteReplicatorName = topic.replicatorPrefix + "." + remoteCluster;
+        String remoteReplicatorName = topic.getReplicatorPrefix() + "." + remoteCluster;
         ConcurrentOpenHashMap<String, Replicator> replicatorMap = topic.getReplicators();
 
         final URL brokerUrl = new URL(