You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/22 13:21:24 UTC

[pulsar] branch branch-2.8 updated: [Transaction] Fix transaction system topic create in loop (#12889)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 50727d7  [Transaction] Fix transaction system topic create in loop (#12889)
50727d7 is described below

commit 50727d73810d39b878495936324b85c988e4db9b
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Nov 22 21:20:07 2021 +0800

    [Transaction] Fix transaction system topic create in loop (#12889)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 14 ++--
 .../broker/admin/impl/PersistentTopicsBase.java    | 16 ++++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  2 +
 .../pulsar/broker/service/BrokerService.java       |  9 +++
 .../pulsar/broker/transaction/TransactionTest.java | 78 ++++++++++++++++++++--
 .../org/apache/pulsar/common/naming/TopicName.java |  3 +
 6 files changed, 109 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ba5a90b..06430aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -135,7 +135,6 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -1673,11 +1672,16 @@ public class PulsarService implements AutoCloseable {
     }
 
 
-    private static boolean isTransactionSystemTopic(TopicName topicName) {
+    public static boolean isTransactionSystemTopic(TopicName topicName) {
         String topic = topicName.toString();
         return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
-                || topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
-                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
+                || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
+                || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
+    }
+
+    public static boolean isTransactionInternalName(TopicName topicName) {
+        String topic = topicName.toString();
+        return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
                 || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1db53c0..86116d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
 import static org.apache.pulsar.common.util.Codec.decode;
@@ -164,7 +165,8 @@ public class PersistentTopicsBase extends AdminResource {
         try {
             String path = String.format("/managed-ledgers/%s/%s", namespaceName.toString(), domain());
             for (String topic : getLocalPolicies().getChildren(path)) {
-                if (domain().equals(TopicDomain.persistent.toString())) {
+                if (domain().equals(TopicDomain.persistent.toString())
+                        && !isTransactionInternalName(TopicName.get(topic))) {
                     topics.add(TopicName.get(domain(), namespaceName, decode(topic)).toString());
                 }
             }
@@ -198,6 +200,13 @@ public class PersistentTopicsBase extends AdminResource {
         return getPartitionedTopicList(TopicDomain.getEnum(domain()));
     }
 
+    protected void validateCreateTopic(TopicName topicName) {
+        if (isTransactionInternalName(topicName)) {
+            log.warn("Try to create a topic in the system topic format! {}", topicName);
+            throw new RestException(Status.BAD_REQUEST, "Cannot create topic in system topic format!");
+        }
+    }
+
     protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
@@ -3517,7 +3526,10 @@ public class PersistentTopicsBase extends AdminResource {
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
-            throw new RestException(e);
+            if (e.getCause() instanceof NotAllowedException) {
+                throw new RestException(Status.BAD_REQUEST, e.getCause());
+            }
+            throw new RestException(e.getCause());
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 2c54908..125e3f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -233,6 +233,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             validateGlobalNamespaceOwnership(tenant, namespace);
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
             validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
+            validateCreateTopic(topicName);
             internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
@@ -265,6 +266,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateGlobalNamespaceOwnership(tenant, namespace);
         validateTopicName(tenant, namespace, encodedTopic);
+        validateCreateTopic(topicName);
         internalCreateNonPartitionedTopic(authoritative);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 62124fb..b49fb59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
@@ -1282,6 +1283,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             return;
         }
 
+        if (isTransactionSystemTopic(topicName)) {
+            String msg = String.format("Can not create transaction system topic %s", topic);
+            log.warn(msg);
+            pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+            topicFuture.completeExceptionally(new NotAllowedException(msg));
+            return;
+        }
+
         if (createIfMissing && !checkMaxTopicsPerNamespace(topicName, 1, topicFuture)) {
             return;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index fb94638..052700f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -18,12 +18,20 @@
  */
 package org.apache.pulsar.broker.transaction;
 
+import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
 import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.FileAssert.fail;
+
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -44,6 +52,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -164,14 +173,14 @@ public class TransactionTest extends TransactionTestBase {
         Transaction transaction = pulsarClient.newTransaction()
                 .build().get();
         TxnID txnID = transaction.getTxnID();
-        Assert.assertEquals(txnID.getLeastSigBits(), 0);
-        Assert.assertEquals(txnID.getMostSigBits(), 0);
+        assertEquals(txnID.getLeastSigBits(), 0);
+        assertEquals(txnID.getMostSigBits(), 0);
         transaction.abort();
         transaction = pulsarClient.newTransaction()
                 .build().get();
         txnID = transaction.getTxnID();
-        Assert.assertEquals(txnID.getLeastSigBits(), 1);
-        Assert.assertEquals(txnID.getMostSigBits(), 0);
+        assertEquals(txnID.getLeastSigBits(), 1);
+        assertEquals(txnID.getMostSigBits(), 0);
     }
 
     @Test
@@ -228,12 +237,12 @@ public class TransactionTest extends TransactionTestBase {
                     Assert.fail();
                 }
                 TopicPolicies topicPolicies = originPersistentTopic.getTopicPolicies().get();
-                Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize);
+                assertEquals(retentionSizeInMbSetTopic, retentionSize);
                 MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider();
                 CompletableFuture<PendingAckStore> future = mlPendingAckStoreProvider.newPendingAckStore(subscription);
                 future.thenAccept(pendingAckStore -> {
                             ((MLPendingAckStore) pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> {
-                                Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
+                                assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
                                         retentionSizeInMbSetTo);
                             });
                         }
@@ -246,4 +255,61 @@ public class TransactionTest extends TransactionTestBase {
 
     }
 
+    @Test
+    public void testCreateTransactionSystemTopic() throws Exception {
+        String subName = "test";
+        String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
+
+        try {
+            // init pending ack
+            @Cleanup
+            Consumer<byte[]> consumer = getConsumer(topicName, subName);
+            Transaction transaction = pulsarClient.newTransaction()
+                    .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+
+            consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+        }
+        topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);
+
+        // getList does not include transaction system topic
+        List<String> list = admin.topics().getList(NAMESPACE1);
+        assertEquals(list.size(), 3);
+        list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
+
+        try {
+            // can't create transaction system topic
+            @Cleanup
+            Consumer<byte[]> consumer = getConsumer(topicName, subName);
+            fail();
+        } catch (PulsarClientException.NotAllowedException e) {
+            assertTrue(e.getMessage().contains("Can not create transaction system topic"));
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().getSubscriptions(topicName);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().createPartitionedTopic(topicName, 3);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().createNonPartitionedTopic(topicName);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index fd183df..b8537d2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -67,6 +67,9 @@ public class TopicName implements ServiceUnitId {
     public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
             NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
 
+    public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
+
     public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
         String name = domain + "://" + namespaceName.toString() + '/' + topic;
         return TopicName.get(name);