You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/02 17:16:49 UTC
[pulsar] branch master updated: Add base class for PersistentTopic
and NonPersistentTopic (#4438)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5f2dfc1 Add base class for PersistentTopic and NonPersistentTopic (#4438)
5f2dfc1 is described below
commit 5f2dfc1fd73a937f505a25a74fa9edd6a5a0e324
Author: Like <ke...@outlook.com>
AuthorDate: Mon Jun 3 01:16:43 2019 +0800
Add base class for PersistentTopic and NonPersistentTopic (#4438)
* Add base class for PersistentTopic and NonPersistentTopic
* Fix test
---
.../broker/admin/impl/PersistentTopicsBase.java | 10 +-
.../pulsar/broker/service/AbstractTopic.java | 203 +++++++++++++++++++++
.../nonpersistent/NonPersistentReplicator.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 161 +---------------
.../PersistentDispatcherSingleActiveConsumer.java | 2 +-
.../service/persistent/PersistentReplicator.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 176 ++----------------
.../broker/admin/AdminApiSchemaAutoUpdateTest.java | 6 +-
.../pulsar/broker/service/PersistentTopicTest.java | 2 +-
9 files changed, 232 insertions(+), 332 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2d2b7bd..b7302f7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -769,7 +769,7 @@ public class PersistentTopicsBase extends AdminResource {
validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
- if (subName.startsWith(topic.replicatorPrefix)) {
+ if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
@@ -801,7 +801,7 @@ public class PersistentTopicsBase extends AdminResource {
validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
- if (subName.startsWith(topic.replicatorPrefix)) {
+ if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
@@ -1050,13 +1050,13 @@ public class PersistentTopicsBase extends AdminResource {
PersistentReplicator repl = null;
PersistentSubscription sub = null;
Entry entry = null;
- if (subName.startsWith(topic.replicatorPrefix)) {
+ if (subName.startsWith(topic.getReplicatorPrefix())) {
repl = getReplicatorReference(subName, topic);
} else {
sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
}
try {
- if (subName.startsWith(topic.replicatorPrefix)) {
+ if (subName.startsWith(topic.getReplicatorPrefix())) {
entry = repl.peekNthMessage(messagePosition).get();
} else {
entry = sub.peekNthMessage(messagePosition).get();
@@ -1200,7 +1200,7 @@ public class PersistentTopicsBase extends AdminResource {
}
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
- if (subName.startsWith(topic.replicatorPrefix)) {
+ if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
new file mode 100644
index 0000000..31b27f7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.base.MoreObjects;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+
+public abstract class AbstractTopic implements Topic {
+ private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
+
+ protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
+
+ protected final String topic;
+
+ // Producers currently connected to this topic
+ protected final ConcurrentOpenHashSet<Producer> producers;
+
+ protected final BrokerService brokerService;
+
+ // Prefix for replication cursors
+ protected final String replicatorPrefix;
+
+ protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ protected volatile boolean isFenced;
+
+ // Timestamp of when this topic was last seen active
+ protected volatile long lastActive;
+
+ // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
+ // doesn't support batch-message
+ protected volatile boolean hasBatchMessagePublished = false;
+
+ protected StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
+
+ // Whether messages published must be encrypted or not in this topic
+ protected volatile boolean isEncryptionRequired = false;
+ protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+ SchemaCompatibilityStrategy.FULL;
+ // schema validation enforced flag
+ protected volatile boolean schemaValidationEnforced = false;
+
+ public AbstractTopic(String topic, BrokerService brokerService) {
+ this.topic = topic;
+ this.brokerService = brokerService;
+ this.producers = new ConcurrentOpenHashSet<>(16, 1);
+ this.isFenced = false;
+ this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
+ this.lastActive = System.nanoTime();
+ }
+
+ protected boolean isProducersExceeded() {
+ Policies policies;
+ try {
+ policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
+ .orElseGet(() -> new Policies());
+ } catch (Exception e) {
+ policies = new Policies();
+ }
+ final int maxProducers = policies.max_producers_per_topic > 0 ?
+ policies.max_producers_per_topic :
+ brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+ if (maxProducers > 0 && maxProducers <= producers.size()) {
+ return true;
+ }
+ return false;
+ }
+
+ protected boolean hasLocalProducers() {
+ AtomicBoolean foundLocal = new AtomicBoolean(false);
+ producers.forEach(producer -> {
+ if (!producer.isRemote()) {
+ foundLocal.set(true);
+ }
+ });
+
+ return foundLocal.get();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("topic", topic).toString();
+ }
+
+ @Override
+ public ConcurrentOpenHashSet<Producer> getProducers() {
+ return producers;
+ }
+
+
+ public BrokerService getBrokerService() {
+ return brokerService;
+ }
+
+ @Override
+ public String getName() {
+ return topic;
+ }
+
+ @Override
+ public boolean isEncryptionRequired() {
+ return isEncryptionRequired;
+ }
+
+ @Override
+ public boolean getSchemaValidationEnforced() {
+ return schemaValidationEnforced;
+ }
+
+ public void markBatchMessagePublished() {
+ this.hasBatchMessagePublished = true;
+ }
+
+ public String getReplicatorPrefix() {
+ return replicatorPrefix;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> hasSchema() {
+ String base = TopicName.get(getName()).getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ return brokerService.pulsar()
+ .getSchemaRegistryService()
+ .getSchema(id).thenApply(Objects::nonNull);
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+ if (schema == null) {
+ return CompletableFuture.completedFuture(SchemaVersion.Empty);
+ }
+
+ String base = TopicName.get(getName()).getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ return brokerService.pulsar()
+ .getSchemaRegistryService()
+ .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> deleteSchema() {
+ String base = TopicName.get(getName()).getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
+ return schemaRegistryService.getSchema(id)
+ .thenCompose(schema -> {
+ if (schema != null) {
+ return schemaRegistryService.deleteSchema(id, "");
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
+ String base = TopicName.get(getName()).getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ return brokerService.pulsar()
+ .getSchemaRegistryService()
+ .isCompatible(id, schema, schemaCompatibilityStrategy);
+ }
+
+ @Override
+ public void recordAddLatency(long latencyUSec) {
+ addEntryLatencyStatsUsec.addValue(latencyUSec);
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 10e16ff..b6ea53a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -51,7 +51,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
- super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
+ super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
producerBuilder.blockIfQueueFull(false);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3499afc..c8779d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -20,11 +20,9 @@ package org.apache.pulsar.broker.service.nonpersistent;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.carrotsearch.hppc.ObjectObjectHashMap;
-import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -39,13 +37,11 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -65,7 +61,6 @@ import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
@@ -83,54 +78,29 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.Curso
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.schema.SchemaData;
-import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NonPersistentTopic implements Topic {
- private final String topic;
-
- // Producers currently connected to this topic
- private final ConcurrentOpenHashSet<Producer> producers;
+public class NonPersistentTopic extends AbstractTopic implements Topic {
// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
- private final BrokerService brokerService;
-
- private volatile boolean isFenced;
-
- // Prefix for replication cursors
- public final String replicatorPrefix;
-
protected static final AtomicLongFieldUpdater<NonPersistentTopic> USAGE_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(NonPersistentTopic.class, "usageCount");
private volatile long usageCount = 0;
- private final OrderedExecutor executor;
-
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
- // Timestamp of when this topic was last seen active
- private volatile long lastActive;
-
- // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
- // doesn't support batch-message
- private volatile boolean hasBatchMessagePublished = false;
// Ever increasing counter of entries added
- static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater
+ private static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater
.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
private volatile long entriesAddedCounter = 0;
- private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
- private StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
@@ -139,13 +109,6 @@ public class NonPersistentTopic implements Topic {
}
};
- // Whether messages published must be encrypted or not in this topic
- private volatile boolean isEncryptionRequired = false;
- private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
- SchemaCompatibilityStrategy.FULL;
- // schema validation enforced flag
- private volatile boolean schemaValidationEnforced = false;
-
private static class TopicStats {
public double averageMsgSize;
public double aggMsgRateIn;
@@ -170,18 +133,12 @@ public class NonPersistentTopic implements Topic {
}
public NonPersistentTopic(String topic, BrokerService brokerService) {
- this.topic = topic;
- this.brokerService = brokerService;
- this.producers = new ConcurrentOpenHashSet<Producer>(16, 1);
+ super(topic, brokerService);
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.isFenced = false;
- this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
- this.executor = brokerService.getTopicOrderedExecutor();
USAGE_COUNT_UPDATER.set(this, 0);
- this.lastActive = System.nanoTime();
-
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
@@ -265,40 +222,11 @@ public class NonPersistentTopic implements Topic {
}
}
- private boolean isProducersExceeded() {
- Policies policies;
- try {
- policies = brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
- .orElseGet(() -> new Policies());
- } catch (Exception e) {
- policies = new Policies();
- }
- final int maxProducers = policies.max_producers_per_topic > 0 ?
- policies.max_producers_per_topic :
- brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
- if (maxProducers > 0 && maxProducers <= producers.size()) {
- return true;
- }
- return false;
- }
-
@Override
public void checkMessageDeduplicationInfo() {
// No-op
}
- private boolean hasLocalProducers() {
- AtomicBoolean foundLocal = new AtomicBoolean(false);
- producers.forEach(producer -> {
- if (!producer.isRemote()) {
- foundLocal.set(true);
- }
- });
-
- return foundLocal.get();
- }
-
@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
@@ -673,16 +601,6 @@ public class NonPersistentTopic implements Topic {
// No-op
}
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this).add("topic", topic).toString();
- }
-
- @Override
- public ConcurrentOpenHashSet<Producer> getProducers() {
- return producers;
- }
-
public int getNumberOfConsumers() {
int count = 0;
for (NonPersistentSubscription subscription : subscriptions.values()) {
@@ -710,15 +628,6 @@ public class NonPersistentTopic implements Topic {
return replicators.get(remoteCluster);
}
- public BrokerService getBrokerService() {
- return brokerService;
- }
-
- @Override
- public String getName() {
- return topic;
- }
-
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
@@ -996,14 +905,6 @@ public class NonPersistentTopic implements Topic {
}
@Override
- public boolean isEncryptionRequired() {
- return isEncryptionRequired;
- }
-
- @Override
- public boolean getSchemaValidationEnforced() { return schemaValidationEnforced; }
-
- @Override
public boolean isReplicated() {
return replicators.size() > 1;
}
@@ -1019,57 +920,8 @@ public class NonPersistentTopic implements Topic {
throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic");
}
- public void markBatchMessagePublished() {
- this.hasBatchMessagePublished = true;
- }
-
private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
- @Override
- public CompletableFuture<Boolean> hasSchema() {
- String base = TopicName.get(getName()).getPartitionedTopicName();
- String id = TopicName.get(base).getSchemaName();
- return brokerService.pulsar()
- .getSchemaRegistryService()
- .getSchema(id).thenApply((schema) -> schema != null);
- }
-
- @Override
- public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
- if (schema == null) {
- return CompletableFuture.completedFuture(SchemaVersion.Empty);
- }
-
- String base = TopicName.get(getName()).getPartitionedTopicName();
- String id = TopicName.get(base).getSchemaName();
- return brokerService.pulsar()
- .getSchemaRegistryService()
- .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
- }
-
- @Override
- public CompletableFuture<SchemaVersion> deleteSchema() {
- String base = TopicName.get(getName()).getPartitionedTopicName();
- String id = TopicName.get(base).getSchemaName();
- SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
- return schemaRegistryService.getSchema(id)
- .thenCompose(schema -> {
- if (schema != null) {
- return schemaRegistryService.deleteSchema(id, "");
- } else {
- return CompletableFuture.completedFuture(null);
- }
- });
- }
-
- @Override
- public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
- String base = TopicName.get(getName()).getPartitionedTopicName();
- String id = TopicName.get(base).getSchemaName();
- return brokerService.pulsar()
- .getSchemaRegistryService()
- .isCompatible(id, schema, schemaCompatibilityStrategy);
- }
@Override
public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
@@ -1082,9 +934,4 @@ public class NonPersistentTopic implements Topic {
}
});
}
-
- @Override
- public void recordAddLatency(long latencyUSec) {
- addEntryLatencyStatsUsec.addValue(latencyUSec);
- }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index e0b5616..75d2b49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -421,7 +421,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
}
havePendingRead = true;
if (consumer.readCompacted()) {
- topic.compactedTopic.asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 01c547e..dc5752e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -98,7 +98,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException {
- super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
+ super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
this.topic = topic;
this.cursor = cursor;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f60302e..a1ed823 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
@@ -35,7 +34,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -56,9 +54,9 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
@@ -83,7 +81,6 @@ import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -107,12 +104,10 @@ import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.schema.SchemaData;
-import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
@@ -123,7 +118,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.carrotsearch.hppc.ObjectObjectHashMap;
-import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -131,46 +125,25 @@ import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
-public class PersistentTopic implements Topic, AddEntryCallback {
- private final String topic;
+public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {
// Managed ledger associated with the topic
private final ManagedLedger ledger;
- // Producers currently connected to this topic
- private final ConcurrentOpenHashSet<Producer> producers;
-
// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
private final ConcurrentOpenHashMap<String, Replicator> replicators;
- private final BrokerService brokerService;
-
- private volatile boolean isFenced;
-
protected static final AtomicLongFieldUpdater<PersistentTopic> USAGE_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(PersistentTopic.class, "usageCount");
@SuppressWarnings("unused")
private volatile long usageCount = 0;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
- // Prefix for replication cursors
- public final String replicatorPrefix;
-
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
- private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
-
- // Timestamp of when this topic was last seen active
- private volatile long lastActive;
-
- // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
- // doesn't support batch-message
- private volatile boolean hasBatchMessagePublished = false;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
@@ -178,21 +151,12 @@ public class PersistentTopic implements Topic, AddEntryCallback {
private final MessageDeduplication messageDeduplication;
private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
- CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
- final CompactedTopic compactedTopic;
+ private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
+ private final CompactedTopic compactedTopic;
- CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
+ private CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
(MessageIdImpl)MessageId.earliest);
- // Whether messages published must be encrypted or not in this topic
- private volatile boolean isEncryptionRequired = false;
- private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
- SchemaCompatibilityStrategy.FULL;
- // schema validation enforced flag
- private volatile boolean schemaValidationEnforced = false;
- private final StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
-
-
private volatile Optional<ReplicatedSubscriptionsController> replicatedSubscriptionsController = Optional.empty();
private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
@@ -211,7 +175,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats;
public TopicStatsHelper() {
- remotePublishersStats = new ObjectObjectHashMap<String, PublisherStats>();
+ remotePublishersStats = new ObjectObjectHashMap<>();
reset();
}
@@ -226,14 +190,10 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws NamingException {
- this.topic = topic;
+ super(topic, brokerService);
this.ledger = ledger;
- this.brokerService = brokerService;
- this.producers = new ConcurrentOpenHashSet<Producer>(16, 1);
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
- this.isFenced = false;
- this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
USAGE_COUNT_UPDATER.set(this, 0);
initializeDispatchRateLimiterIfNeeded(Optional.empty());
@@ -261,8 +221,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
subscriptions.get(subscriptionName).deactivateCursor();
}
}
- this.lastActive = System.nanoTime();
-
this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);
try {
@@ -411,35 +369,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
}
- private boolean isProducersExceeded() {
- Policies policies;
- try {
- policies = brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
- .orElseGet(() -> new Policies());
- } catch (Exception e) {
- policies = new Policies();
- }
- final int maxProducers = policies.max_producers_per_topic > 0 ?
- policies.max_producers_per_topic :
- brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
- if (maxProducers > 0 && maxProducers <= producers.size()) {
- return true;
- }
- return false;
- }
-
- private boolean hasLocalProducers() {
- AtomicBoolean foundLocal = new AtomicBoolean(false);
- producers.forEach(producer -> {
- if (!producer.isRemote()) {
- foundLocal.set(true);
- }
- });
-
- return foundLocal.get();
- }
-
private boolean hasRemoteProducers() {
AtomicBoolean foundRemote = new AtomicBoolean(false);
producers.forEach(producer -> {
@@ -1205,16 +1134,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
return messageDeduplication.isEnabled();
}
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this).add("topic", topic).toString();
- }
-
- @Override
- public ConcurrentOpenHashSet<Producer> getProducers() {
- return producers;
- }
-
public int getNumberOfConsumers() {
int count = 0;
for (PersistentSubscription subscription : subscriptions.values()) {
@@ -1232,10 +1151,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
return subscriptions.get(subscriptionName);
}
- public BrokerService getBrokerService() {
- return brokerService;
- }
-
public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
return replicators;
}
@@ -1244,11 +1159,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
return replicators.get(remoteCluster);
}
- @Override
- public String getName() {
- return topic;
- }
-
public ManagedLedger getManagedLedger() {
return ledger;
}
@@ -1658,7 +1568,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
// Negative retention time means the topic should be retained indefinitely,
// because its own data has to be retained
return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
- }).orElse(false).booleanValue();
+ }).orElse(false);
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
@@ -1750,14 +1660,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
@Override
- public boolean isEncryptionRequired() {
- return isEncryptionRequired;
- }
-
- @Override
- public boolean getSchemaValidationEnforced() { return schemaValidationEnforced; }
-
- @Override
public boolean isReplicated() {
return !replicators.isEmpty();
}
@@ -1851,10 +1753,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
}
- public void markBatchMessagePublished() {
- this.hasBatchMessagePublished = true;
- }
-
public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
return this.dispatchRateLimiter;
}
@@ -1946,52 +1844,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
@Override
- public CompletableFuture<Boolean> hasSchema() {
- String base = TopicName.get(getName()).getPartitionedTopicName();
- String id = TopicName.get(base).getSchemaName();
- return brokerService.pulsar()
- .getSchemaRegistryService()
- .getSchema(id).thenApply((schema) -> schema != null);
- }
-
- @Override
- public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
- if (schema == null) {
- return CompletableFuture.completedFuture(SchemaVersion.Empty);
- }
-
- String base = TopicName.get(getName()).getPartitionedTopicName();
- String id = TopicName.get(base).getSchemaName();
- return brokerService.pulsar()
- .getSchemaRegistryService()
- .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
- }
-
- @Override
- public CompletableFuture<SchemaVersion> deleteSchema() {
- String base = TopicName.get(getName()).getPartitionedTopicName();
- String id = TopicName.get(base).getSchemaName();
- SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
- return schemaRegistryService.getSchema(id)
- .thenCompose(schema -> {
- if (schema != null) {
- return schemaRegistryService.deleteSchema(id, "");
- } else {
- return CompletableFuture.completedFuture(null);
- }
- });
- }
-
- @Override
- public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
- String base = TopicName.get(getName()).getPartitionedTopicName();
- String id = TopicName.get(base).getSchemaName();
- return brokerService.pulsar()
- .getSchemaRegistryService()
- .isCompatible(id, schema, schemaCompatibilityStrategy);
- }
-
- @Override
public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
@@ -2003,12 +1855,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
});
}
-
- @Override
- public void recordAddLatency(long latencyUSec) {
- addEntryLatencyStatsUsec.addValue(latencyUSec);
- }
-
private synchronized void checkReplicatedSubscriptionControllerState() {
AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
subscriptions.forEach((name, subscription) -> {
@@ -2047,5 +1893,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
}
ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);;
- }
+ }
+
+ public CompactedTopic getCompactedTopic() {
+ return compactedTopic;
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
index dc7a9ac..5af5c7f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java
@@ -184,10 +184,10 @@ public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest {
for (int i = 0; i < 100; i++) {
Topic t = pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
- // get around fact that field is private and topic can be persisent or non-persistent
- Field strategy = t.getClass().getDeclaredField("schemaCompatibilityStrategy");
+ // get around fact that field is private and topic can be persistent or non-persistent
+ Field strategy = t.getClass().getSuperclass().getDeclaredField("schemaCompatibilityStrategy");
strategy.setAccessible(true);
- if (((SchemaCompatibilityStrategy)strategy.get(t)) == SchemaCompatibilityStrategy.FULL) {
+ if (strategy.get(t) == SchemaCompatibilityStrategy.FULL) {
break;
}
Thread.sleep(100);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index b9cef67..556f77b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1189,7 +1189,7 @@ public class PersistentTopicTest {
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
- String remoteReplicatorName = topic.replicatorPrefix + "." + remoteCluster;
+ String remoteReplicatorName = topic.getReplicatorPrefix() + "." + remoteCluster;
ConcurrentOpenHashMap<String, Replicator> replicatorMap = topic.getReplicators();
final URL brokerUrl = new URL(