You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:56:06 UTC

[pulsar] 09/13: [Transaction] Fix broker init transaction related topic. (#11022)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 83a29ecc0332a9bc65a8e4474c27955accb1359f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jun 23 23:07:18 2021 +0800

    [Transaction] Fix broker init transaction related topic. (#11022)
    
    (cherry picked from commit e092fb33699c227c110a9c707ff6fcfedfd1ee7a)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  14 +-
 .../service/persistent/PersistentSubscription.java |   7 +-
 .../broker/service/persistent/PersistentTopic.java |   7 +-
 .../pulsar/broker/transaction/TransactionTest.java | 142 +++++++++++++++++++++
 4 files changed, 157 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index cbd3eb3..0d01968 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -108,6 +109,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.validator.MultipleListenerValidator;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -122,6 +124,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -990,7 +993,7 @@ public class PulsarService implements AutoCloseable {
             for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) {
                 try {
                     TopicName topicName = TopicName.get(topic);
-                    if (bundle.includes(topicName)) {
+                    if (bundle.includes(topicName) && !isTransactionSystemTopic(topicName)) {
                         CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic);
                         if (future != null) {
                             persistentTopics.add(future);
@@ -1584,4 +1587,13 @@ public class PulsarService implements AutoCloseable {
         return workerConfig;
     }
 
+
+    private static boolean isTransactionSystemTopic(TopicName topicName) {
+        String topic = topicName.toString();
+        return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
+                || topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
+                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
+                || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 16f2259..316ac10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -59,7 +59,6 @@ import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
-import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -77,7 +76,6 @@ import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,10 +139,7 @@ public class PersistentSubscription implements Subscription {
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(TopicName.get(topicName))
-                && !topicName.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName())
-                && !topicName.startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX)
-                && !topicName.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
+                && !checkTopicIsEventsNames(TopicName.get(topicName))) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
         } else {
             this.pendingAckHandle = new PendingAckHandleDisabled();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8f7f356..3d5e10d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -107,7 +107,6 @@ import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
-import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
@@ -149,7 +148,6 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.utils.StatsOutputStream;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -317,10 +315,7 @@ public class PersistentTopic extends AbstractTopic
         checkReplicatedSubscriptionControllerState();
         TopicName topicName = TopicName.get(topic);
         if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(topicName)
-                && !topicName.getEncodedLocalName().startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName())
-                && !topicName.getEncodedLocalName().startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX)
-                && !topicName.getEncodedLocalName().endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
+                && !checkTopicIsEventsNames(topicName)) {
             this.transactionBuffer = brokerService.getPulsar()
                     .getTransactionBufferProvider().newTransactionBuffer(this, transactionCompletableFuture);
         } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
new file mode 100644
index 0000000..ce8b3ae
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction;
+
+import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import com.google.common.collect.Sets;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Pulsar client transaction test.
+ */
+@Slf4j
+@Test(groups = "broker")
+public class TransactionTest extends TransactionTestBase {
+
+    private static final String TENANT = "tnx";
+    private static final String NAMESPACE1 = TENANT + "/ns1";
+
+    @BeforeMethod
+    protected void setup() throws Exception {
+        this.setBrokerCount(1);
+        this.internalSetup();
+
+        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
+        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
+        admin.tenants().createTenant(TENANT,
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NAMESPACE1);
+
+        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
+        pulsarClient.close();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(true)
+                .build();
+    }
+
+    @Test
+    public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
+        String subName = "test";
+
+        String topicName = TopicName.get(NAMESPACE1 + "/test").toString();
+
+
+        @Cleanup
+        Consumer<byte[]> consumer = getConsumer(topicName, subName);
+
+        consumer.close();
+
+        Awaitility.await().until(() -> {
+            try {
+                pulsarClient.newTransaction()
+                        .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+            } catch (Exception e) {
+                return false;
+            }
+            return true;
+        });
+
+        admin.namespaces().unload(NamespaceName.SYSTEM_NAMESPACE.toString());
+        admin.namespaces().unload(NAMESPACE1);
+
+        @Cleanup
+        Consumer<byte[]> consumer1 = getConsumer(topicName, subName);
+
+        Awaitility.await().until(() -> {
+            try {
+                pulsarClient.newTransaction()
+                        .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+            } catch (Exception e) {
+                return false;
+            }
+            return true;
+        });
+
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
+                getPulsarServiceList().get(0).getBrokerService().getTopics();
+
+        Assert.assertNull(topics.get(TopicName.get(TopicDomain.persistent.value(),
+                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString() + 0));
+        Assert.assertNull(topics.get(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
+        Assert.assertNull(topics.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName)));
+    }
+
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    public Consumer<byte[]> getConsumer(String topicName, String subName) throws PulsarClientException {
+        return pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+    }
+}
\ No newline at end of file