You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/08/01 10:05:13 UTC
[pulsar] branch branch-2.8 updated: [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043) (#16867)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 08bbcf388d9 [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043) (#16867)
08bbcf388d9 is described below
commit 08bbcf388d92f09973871747b1266a087ec679af
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Aug 1 18:05:07 2022 +0800
[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043) (#16867)
(cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6)
In addition to #16043, this PR fixes https://github.com/apache/pulsar/issues/16861
---
.../pulsar/broker/resources/BaseResources.java | 12 ++-
.../broker/resources/NamespaceResources.java | 17 +++-
.../pulsar/broker/resources/PulsarResources.java | 2 +
.../pulsar/broker/resources/TopicResources.java | 53 ++++++++++++
.../pulsar/broker/service/BrokerService.java | 97 ++++++++++++++++------
.../pulsar/common/naming/SystemTopicNames.java | 70 ++++++++++++++++
6 files changed, 225 insertions(+), 26 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 8016bcef314..5b195d9dcb1 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+
+import com.google.common.base.Joiner;
import lombok.Getter;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -38,6 +40,8 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
*/
public class BaseResources<T> {
+ protected static final String BASE_POLICIES_PATH = "/admin/policies";
+
@Getter
private final MetadataStoreExtended store;
@Getter
@@ -164,4 +168,10 @@ public class BaseResources<T> {
public CompletableFuture<Boolean> existsAsync(String path) {
return cache.exists(path);
}
-}
\ No newline at end of file
+
+ protected static String joinPath(String... parts) {
+ StringBuilder sb = new StringBuilder();
+ Joiner.on('/').appendTo(sb, parts);
+ return sb.toString();
+ }
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 58d493ee171..f4d876d2534 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -21,8 +21,12 @@ package org.apache.pulsar.broker.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
import lombok.Getter;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.Policies;
@@ -43,6 +47,10 @@ public class NamespaceResources extends BaseResources<Policies> {
partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
}
+ public CompletableFuture<Optional<Policies>> getPoliciesAsync(NamespaceName ns) {
+ return getCache().get(joinPath(BASE_POLICIES_PATH, ns.toString()));
+ }
+
public static class IsolationPolicyResources extends BaseResources<Map<String, NamespaceIsolationDataImpl>> {
public IsolationPolicyResources(MetadataStoreExtended store, int operationTimeoutSec) {
super(store, new TypeReference<Map<String, NamespaceIsolationDataImpl>>() {
@@ -56,8 +64,15 @@ public class NamespaceResources extends BaseResources<Policies> {
}
public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> {
+ private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics";
+
public PartitionedTopicResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) {
super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec);
}
+
+ public CompletableFuture<Void> createPartitionedTopicAsync(TopicName tn, PartitionedTopicMetadata tm) {
+ return createAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
+ tn.getEncodedLocalName()), tm);
+ }
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fa4853a22c5..5209a795acf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -38,6 +38,7 @@ public class PulsarResources {
private DynamicConfigurationResources dynamicConfigResources;
private LocalPoliciesResources localPolicies;
private LoadManagerReportResources loadReportResources;
+ private TopicResources topicResources;
private Optional<MetadataStoreExtended> localMetadataStore;
private Optional<MetadataStoreExtended> configurationMetadataStore;
@@ -56,6 +57,7 @@ public class PulsarResources {
dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
localPolicies = new LocalPoliciesResources(localMetadataStore, operationTimeoutSec);
loadReportResources = new LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
+ topicResources = new TopicResources(localMetadataStore);
}
this.localMetadataStore = Optional.ofNullable(localMetadataStore);
this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
new file mode 100644
index 00000000000..d25b308d086
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.pulsar.common.util.Codec.decode;
+
+public class TopicResources {
+ private static final String MANAGED_LEDGER_PATH = "/managed-ledgers";
+
+ private final MetadataStore store;
+
+ public TopicResources(MetadataStore store) {
+ this.store = store;
+ }
+
+ public CompletableFuture<List<String>> getExistingPartitions(TopicName topic) {
+ return getExistingPartitions(topic.getNamespaceObject(), topic.getDomain());
+ }
+
+ public CompletableFuture<List<String>> getExistingPartitions(NamespaceName ns, TopicDomain domain) {
+ String topicPartitionPath = MANAGED_LEDGER_PATH + "/" + ns + "/" + domain;
+ return store.getChildren(topicPartitionPath).thenApply(topics ->
+ topics.stream()
+ .map(s -> String.format("%s://%s/%s", domain.value(), ns, decode(s)))
+ .collect(Collectors.toList())
+ );
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bdaded637b6..e91e462f7df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -141,6 +141,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -2412,16 +2413,39 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
.thenCompose(topicExists -> {
return fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
+ CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
+
// If topic is already exist, creating partitioned topic is not allowed.
if (metadata.partitions == 0
&& !topicExists
&& !topicName.isPartitioned()
&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
&& pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
- return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+
+ pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
+ .thenAccept(md -> future.complete(md))
+ .exceptionally(ex -> {
+ if (ex.getCause()
+ instanceof MetadataStoreException.AlreadyExistsException) {
+ // The partitioned topic might be created concurrently
+ fetchPartitionedTopicMetadataAsync(topicName)
+ .whenComplete((metadata2, ex2) -> {
+ if (ex2 == null) {
+ future.complete(metadata2);
+ } else {
+ future.completeExceptionally(ex2);
+ }
+ });
+ } else {
+ future.completeExceptionally(ex);
+ }
+ return null;
+ });
} else {
- return CompletableFuture.completedFuture(metadata);
+ future.complete(metadata);
}
+
+ return future;
});
});
}
@@ -2436,28 +2460,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
"Number of partitions should be less than or equal to " + maxPartitions);
PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
- CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = futureWithDeadline();
-
- if (!checkMaxTopicsPerNamespace(topicName, defaultNumPartitions, partitionedTopicFuture)) {
- return partitionedTopicFuture;
- }
-
- try {
- PartitionedTopicResources partitionResources = pulsar.getPulsarResources().getNamespaceResources()
- .getPartitionedTopicResources();
- partitionResources.createAsync(partitionedTopicPath(topicName), configMetadata).thenAccept((r) -> {
- log.info("partitioned metadata successfully created for {}", topicName);
- partitionedTopicFuture.complete(configMetadata);
- }).exceptionally(ex -> {
- partitionedTopicFuture.completeExceptionally(ex.getCause());
- return null;
- });
- } catch (Exception e) {
- log.error("Failed to create default partitioned topic.", e);
- return FutureUtil.failedFuture(e);
- }
- return partitionedTopicFuture;
+ return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions)
+ .thenCompose(__ -> {
+ PartitionedTopicResources partitionResources = pulsar.getPulsarResources().getNamespaceResources()
+ .getPartitionedTopicResources();
+ return partitionResources.createPartitionedTopicAsync(topicName, configMetadata)
+ .thenApply(v -> {
+ log.info("partitioned metadata successfully created for {}", topicName);
+ return configMetadata;
+ });
+ });
}
public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) {
@@ -2727,6 +2740,11 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return SystemTopicClient.isSystemTopic(TopicName.get(topic));
}
+ public boolean isSystemTopic(TopicName topicName) {
+ return NamespaceService.isSystemServiceNamespace(topicName.getNamespace())
+ || SystemTopicNames.isSystemTopic(topicName);
+ }
+
/**
* Get {@link TopicPolicies} for the parameterized topic.
* @param topicName
@@ -2744,8 +2762,39 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
+ private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
+ return pulsar.getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(topicName.getNamespaceObject())
+ .thenCompose(optPolicies -> {
+ int maxTopicsPerNamespace = optPolicies.map(p -> p.max_topics_per_namespace)
+ .orElse(pulsar.getConfig().getMaxTopicsPerNamespace());
+
+ if (maxTopicsPerNamespace > 0 && !isSystemTopic(topicName)) {
+ return pulsar().getPulsarResources().getTopicResources()
+ .getExistingPartitions(topicName)
+ .thenCompose(topics -> {
+ // exclude created system topic
+ long topicsCount = topics.stream()
+ .filter(t -> !isSystemTopic(TopicName.get(t)))
+ .count();
+ if (topicsCount + numPartitions > maxTopicsPerNamespace) {
+ log.error("Failed to create persistent topic {}, "
+ + "exceed maximum number of topics in namespace", topicName);
+ return FutureUtil.failedFuture(
+ new RestException(Response.Status.PRECONDITION_FAILED,
+ "Exceed maximum number of topics in namespace."));
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
+
private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions,
- CompletableFuture<T> topicFuture) {
+ CompletableFuture<T> topicFuture) {
Integer maxTopicsPerNamespace;
try {
maxTopicsPerNamespace = pulsar.getConfigurationCache().policiesCache()
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
new file mode 100644
index 00000000000..72ec16752c6
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Encapsulate the parsing of the completeTopicName name.
+ */
+public class SystemTopicNames {
+
+ /**
+ * Local topic name for the namespace events.
+ */
+ public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events";
+
+ /**
+ * Local topic name for the transaction buffer snapshot.
+ */
+ public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
+
+
+ public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
+
+ /**
+ * The set of all local topic names declared above.
+ */
+ public static final Set<String> EVENTS_TOPIC_NAMES =
+ Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT));
+
+ public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
+ NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
+
+ public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
+ NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
+
+ public static boolean isEventSystemTopic(TopicName topicName) {
+ return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
+ }
+
+ public static boolean isTransactionInternalName(TopicName topicName) {
+ String topic = topicName.toString();
+ return topic.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString())
+ || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
+ || topic.endsWith(PENDING_ACK_STORE_SUFFIX);
+ }
+
+ public static boolean isSystemTopic(TopicName topicName) {
+ TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
+ return isEventSystemTopic(nonePartitionedTopicName) || isTransactionInternalName(nonePartitionedTopicName);
+ }
+}