You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/08/01 10:05:13 UTC

[pulsar] branch branch-2.8 updated: [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043) (#16867)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 08bbcf388d9 [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043) (#16867)
08bbcf388d9 is described below

commit 08bbcf388d92f09973871747b1266a087ec679af
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Aug 1 18:05:07 2022 +0800

    [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043) (#16867)
    
    (cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6)
    
    In addition to #16043, this PR fixes https://github.com/apache/pulsar/issues/16861
---
 .../pulsar/broker/resources/BaseResources.java     | 12 ++-
 .../broker/resources/NamespaceResources.java       | 17 +++-
 .../pulsar/broker/resources/PulsarResources.java   |  2 +
 .../pulsar/broker/resources/TopicResources.java    | 53 ++++++++++++
 .../pulsar/broker/service/BrokerService.java       | 97 ++++++++++++++++------
 .../pulsar/common/naming/SystemTopicNames.java     | 70 ++++++++++++++++
 6 files changed, 225 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 8016bcef314..5b195d9dcb1 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+
+import com.google.common.base.Joiner;
 import lombok.Getter;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -38,6 +40,8 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
  */
 public class BaseResources<T> {
 
+    protected static final String BASE_POLICIES_PATH = "/admin/policies";
+
     @Getter
     private final MetadataStoreExtended store;
     @Getter
@@ -164,4 +168,10 @@ public class BaseResources<T> {
     public CompletableFuture<Boolean> existsAsync(String path) {
         return cache.exists(path);
     }
-}
\ No newline at end of file
+
+    protected static String joinPath(String... parts) {
+        StringBuilder sb = new StringBuilder();
+        Joiner.on('/').appendTo(sb, parts);
+        return sb.toString();
+    }
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 58d493ee171..f4d876d2534 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -21,8 +21,12 @@ package org.apache.pulsar.broker.resources;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
 import lombok.Getter;
 
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -43,6 +47,10 @@ public class NamespaceResources extends BaseResources<Policies> {
         partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
     }
 
+    public CompletableFuture<Optional<Policies>> getPoliciesAsync(NamespaceName ns) {
+        return getCache().get(joinPath(BASE_POLICIES_PATH, ns.toString()));
+    }
+
     public static class IsolationPolicyResources extends BaseResources<Map<String, NamespaceIsolationDataImpl>> {
         public IsolationPolicyResources(MetadataStoreExtended store, int operationTimeoutSec) {
             super(store, new TypeReference<Map<String, NamespaceIsolationDataImpl>>() {
@@ -56,8 +64,15 @@ public class NamespaceResources extends BaseResources<Policies> {
     }
 
     public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> {
+        private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics";
+
         public PartitionedTopicResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) {
             super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec);
         }
+
+        public CompletableFuture<Void> createPartitionedTopicAsync(TopicName tn, PartitionedTopicMetadata tm) {
+            return createAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
+                    tn.getEncodedLocalName()), tm);
+        }
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fa4853a22c5..5209a795acf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -38,6 +38,7 @@ public class PulsarResources {
     private DynamicConfigurationResources dynamicConfigResources;
     private LocalPoliciesResources localPolicies;
     private LoadManagerReportResources loadReportResources;
+    private TopicResources topicResources;
     private Optional<MetadataStoreExtended> localMetadataStore;
     private Optional<MetadataStoreExtended> configurationMetadataStore;
 
@@ -56,6 +57,7 @@ public class PulsarResources {
             dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
             localPolicies = new LocalPoliciesResources(localMetadataStore, operationTimeoutSec);
             loadReportResources = new LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
+            topicResources = new TopicResources(localMetadataStore);
         }
         this.localMetadataStore = Optional.ofNullable(localMetadataStore);
         this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
new file mode 100644
index 00000000000..d25b308d086
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.pulsar.common.util.Codec.decode;
+
+public class TopicResources {
+    private static final String MANAGED_LEDGER_PATH = "/managed-ledgers";
+
+    private final MetadataStore store;
+
+    public TopicResources(MetadataStore store) {
+        this.store = store;
+    }
+
+    public CompletableFuture<List<String>> getExistingPartitions(TopicName topic) {
+        return getExistingPartitions(topic.getNamespaceObject(), topic.getDomain());
+    }
+
+    public CompletableFuture<List<String>> getExistingPartitions(NamespaceName ns, TopicDomain domain) {
+        String topicPartitionPath = MANAGED_LEDGER_PATH + "/" + ns + "/" + domain;
+        return store.getChildren(topicPartitionPath).thenApply(topics ->
+                topics.stream()
+                        .map(s -> String.format("%s://%s/%s", domain.value(), ns, decode(s)))
+                        .collect(Collectors.toList())
+        );
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bdaded637b6..e91e462f7df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -141,6 +141,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -2412,16 +2413,39 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 .thenCompose(topicExists -> {
                     return fetchPartitionedTopicMetadataAsync(topicName)
                             .thenCompose(metadata -> {
+                                CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
+
                                 // If topic is already exist, creating partitioned topic is not allowed.
                                 if (metadata.partitions == 0
                                         && !topicExists
                                         && !topicName.isPartitioned()
                                         && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
                                         && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
-                                    return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+
+                                    pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
+                                            .thenAccept(md -> future.complete(md))
+                                            .exceptionally(ex -> {
+                                                if (ex.getCause()
+                                                        instanceof MetadataStoreException.AlreadyExistsException) {
+                                                    // The partitioned topic might be created concurrently
+                                                    fetchPartitionedTopicMetadataAsync(topicName)
+                                                            .whenComplete((metadata2, ex2) -> {
+                                                                if (ex2 == null) {
+                                                                    future.complete(metadata2);
+                                                                } else {
+                                                                    future.completeExceptionally(ex2);
+                                                                }
+                                                            });
+                                                } else {
+                                                    future.completeExceptionally(ex);
+                                                }
+                                                return null;
+                                            });
                                 } else {
-                                    return CompletableFuture.completedFuture(metadata);
+                                    future.complete(metadata);
                                 }
+
+                                return future;
                             });
                 });
     }
@@ -2436,28 +2460,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 "Number of partitions should be less than or equal to " + maxPartitions);
 
         PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
-        CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = futureWithDeadline();
-
-        if (!checkMaxTopicsPerNamespace(topicName, defaultNumPartitions, partitionedTopicFuture)) {
-            return partitionedTopicFuture;
-        }
-
-        try {
-            PartitionedTopicResources partitionResources = pulsar.getPulsarResources().getNamespaceResources()
-                    .getPartitionedTopicResources();
-            partitionResources.createAsync(partitionedTopicPath(topicName), configMetadata).thenAccept((r) -> {
-                log.info("partitioned metadata successfully created for {}", topicName);
-                partitionedTopicFuture.complete(configMetadata);
-            }).exceptionally(ex -> {
-                partitionedTopicFuture.completeExceptionally(ex.getCause());
-                return null;
-            });
-        } catch (Exception e) {
-            log.error("Failed to create default partitioned topic.", e);
-            return FutureUtil.failedFuture(e);
-        }
 
-        return partitionedTopicFuture;
+        return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions)
+                .thenCompose(__ -> {
+                    PartitionedTopicResources partitionResources = pulsar.getPulsarResources().getNamespaceResources()
+                            .getPartitionedTopicResources();
+                    return partitionResources.createPartitionedTopicAsync(topicName, configMetadata)
+                            .thenApply(v -> {
+                                log.info("partitioned metadata successfully created for {}", topicName);
+                                return configMetadata;
+                            });
+                });
     }
 
     public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) {
@@ -2727,6 +2740,11 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         return SystemTopicClient.isSystemTopic(TopicName.get(topic));
     }
 
+    public boolean isSystemTopic(TopicName topicName) {
+        return NamespaceService.isSystemServiceNamespace(topicName.getNamespace())
+                || SystemTopicNames.isSystemTopic(topicName);
+    }
+
     /**
      * Get {@link TopicPolicies} for the parameterized topic.
      * @param topicName
@@ -2744,8 +2762,39 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
     }
 
+    private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
+        return pulsar.getPulsarResources().getNamespaceResources()
+                .getPoliciesAsync(topicName.getNamespaceObject())
+                .thenCompose(optPolicies -> {
+                    int maxTopicsPerNamespace = optPolicies.map(p -> p.max_topics_per_namespace)
+                            .orElse(pulsar.getConfig().getMaxTopicsPerNamespace());
+
+                    if (maxTopicsPerNamespace > 0 && !isSystemTopic(topicName)) {
+                        return pulsar().getPulsarResources().getTopicResources()
+                                .getExistingPartitions(topicName)
+                                .thenCompose(topics -> {
+                                    // exclude created system topic
+                                    long topicsCount = topics.stream()
+                                            .filter(t -> !isSystemTopic(TopicName.get(t)))
+                                            .count();
+                                    if (topicsCount + numPartitions > maxTopicsPerNamespace) {
+                                        log.error("Failed to create persistent topic {}, "
+                                                + "exceed maximum number of topics in namespace", topicName);
+                                        return FutureUtil.failedFuture(
+                                                new RestException(Response.Status.PRECONDITION_FAILED,
+                                                        "Exceed maximum number of topics in namespace."));
+                                    } else {
+                                        return CompletableFuture.completedFuture(null);
+                                    }
+                                });
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
     private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions,
-                                            CompletableFuture<T> topicFuture) {
+                                                   CompletableFuture<T> topicFuture) {
         Integer maxTopicsPerNamespace;
         try {
             maxTopicsPerNamespace = pulsar.getConfigurationCache().policiesCache()
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
new file mode 100644
index 00000000000..72ec16752c6
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Encapsulate the parsing of the completeTopicName name.
+ */
+public class SystemTopicNames {
+
+    /**
+     * Local topic name for the namespace events.
+     */
+    public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events";
+
+    /**
+     * Local topic name for the transaction buffer snapshot.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
+
+
+    public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
+
+    /**
+     * The set of all local topic names declared above.
+     */
+    public static final Set<String> EVENTS_TOPIC_NAMES =
+            Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT));
+
+    public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
+
+    public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
+
+    public static boolean isEventSystemTopic(TopicName topicName) {
+        return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
+    }
+
+    public static boolean isTransactionInternalName(TopicName topicName) {
+        String topic = topicName.toString();
+        return topic.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString())
+                || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
+                || topic.endsWith(PENDING_ACK_STORE_SUFFIX);
+    }
+
+    public static boolean isSystemTopic(TopicName topicName) {
+        TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
+        return isEventSystemTopic(nonePartitionedTopicName) || isTransactionInternalName(nonePartitionedTopicName);
+    }
+}