You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/11/13 01:16:59 UTC

[pulsar] branch master updated: [Transaction] Fix transaction system topic create in loop. (#12749)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c4d913c [Transaction] Fix transaction system topic create in loop. (#12749)
2c4d913c is described below

commit 2c4d913c4b3fb1c6d924efaa0a24c93a2d2de7d0
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Nov 13 09:15:30 2021 +0800

    [Transaction] Fix transaction system topic create in loop. (#12749)
    
    fix https://github.com/apache/pulsar/issues/12727
    ### Motivation
    Now transaction system topic can be created.
    
    ### Modifications
    we should not allow broker or user create by transaction system format topic.
    
    1. checkout topic auto create.
    2. admin create topic.
    
    ### Verifying this change
    
    add some test for it
---
 .../org/apache/pulsar/broker/PulsarService.java    | 14 +++--
 .../broker/admin/impl/PersistentTopicsBase.java    | 17 +++++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  2 +
 .../pulsar/broker/service/BrokerService.java       |  9 +++
 .../pulsar/broker/transaction/TransactionTest.java | 64 ++++++++++++++++++++++
 .../pulsar/common/events/EventsTopicNames.java     |  3 +-
 .../org/apache/pulsar/common/naming/TopicName.java |  3 +
 7 files changed, 104 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index bca21ae..28ccbb1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
-import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -132,7 +132,6 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -1649,11 +1648,16 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     }
 
 
-    private static boolean isTransactionSystemTopic(TopicName topicName) {
+    public static boolean isTransactionSystemTopic(TopicName topicName) {
         String topic = topicName.toString();
         return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
-                || topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
-                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
+                || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
+                || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
+    }
+
+    public static boolean isTransactionInternalName(TopicName topicName) {
+        String topic = topicName.toString();
+        return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
                 || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ef4510c..fc33ad4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
 import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -161,7 +162,9 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         try {
-            return topicResources().listPersistentTopicsAsync(namespaceName).join();
+            return topicResources().listPersistentTopicsAsync(namespaceName).thenApply(topics ->
+                    topics.stream().filter(topic ->
+                            !isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList())).join();
         } catch (Exception e) {
             log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e);
             throw new RestException(e);
@@ -244,6 +247,13 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected void validateCreateTopic(TopicName topicName) {
+        if (isTransactionInternalName(topicName)) {
+            log.warn("Try to create a topic in the system topic format! {}", topicName);
+            throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!");
+        }
+    }
+
     public void validateAdminOperationOnTopic(boolean authoritative) {
         validateAdminAccessForTenant(topicName.getTenant());
         validateTopicOwnership(topicName, authoritative);
@@ -3683,7 +3693,10 @@ public class PersistentTopicsBase extends AdminResource {
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
-            throw new RestException(e);
+            if (e.getCause() instanceof NotAllowedException) {
+                throw new RestException(Status.CONFLICT, e.getCause());
+            }
+            throw new RestException(e.getCause());
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1318057..bf6c647 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -234,6 +234,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             validateGlobalNamespaceOwnership();
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
             validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
+            validateCreateTopic(topicName);
             internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
@@ -267,6 +268,7 @@ public class PersistentTopics extends PersistentTopicsBase {
         validateNamespaceName(tenant, namespace);
         validateGlobalNamespaceOwnership();
         validateTopicName(tenant, namespace, encodedTopic);
+        validateCreateTopic(topicName);
         internalCreateNonPartitionedTopic(authoritative);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9194820..ba3a74d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -1284,6 +1285,14 @@ public class BrokerService implements Closeable {
             return;
         }
 
+        if (isTransactionSystemTopic(topicName)) {
+            String msg = String.format("Can not create transaction system topic %s", topic);
+            log.warn(msg);
+            pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+            topicFuture.completeExceptionally(new NotAllowedException(msg));
+            return;
+        }
+
         CompletableFuture<Void> maxTopicsCheck = createIfMissing
                 ? checkMaxTopicsPerNamespace(topicName, 1)
                 : CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 930ab92..df527da 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
 import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -26,13 +27,19 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 import com.google.common.collect.Sets;
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Field;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -102,6 +109,63 @@ public class TransactionTest extends TransactionTestBase {
     }
 
     @Test
+    public void testCreateTransactionSystemTopic() throws Exception {
+        String subName = "test";
+        String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
+
+        try {
+            // init pending ack
+            @Cleanup
+            Consumer<byte[]> consumer = getConsumer(topicName, subName);
+            Transaction transaction = pulsarClient.newTransaction()
+                    .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+
+            consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+        }
+        topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);
+
+        // getList does not include transaction system topic
+        List<String> list = admin.topics().getList(NAMESPACE1);
+        assertEquals(list.size(), 4);
+        list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
+
+        try {
+            // can't create transaction system topic
+            @Cleanup
+            Consumer<byte[]> consumer = getConsumer(topicName, subName);
+            fail();
+        } catch (PulsarClientException.NotAllowedException e) {
+            assertTrue(e.getMessage().contains("Can not create transaction system topic"));
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().getSubscriptions(topicName);
+            fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().createPartitionedTopic(topicName, 3);
+            fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().createNonPartitionedTopic(topicName);
+            fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+        }
+    }
+
+    @Test
     public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
         String subName = "test";
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
index 2aa9e12..f82c9ae 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
@@ -49,6 +49,7 @@ public class EventsTopicNames {
     }
 
     public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicName) {
-        return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString().equals(topicName.toString());
+        return topicName != null && topicName.toString()
+                .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 257d27c..efbd8c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -63,6 +63,9 @@ public class TopicName implements ServiceUnitId {
     public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
             NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
 
+    public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
+
     public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
         String name = domain + "://" + namespaceName.toString() + '/' + topic;
         return TopicName.get(name);