You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/15 08:40:36 UTC

[pulsar] 01/03: [Transaction] Fix transaction system topic create in loop. (#12749)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 38b05dc6f201c6b1c32dc54ddabc23349210a247
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Nov 13 09:15:30 2021 +0800

    [Transaction] Fix transaction system topic create in loop. (#12749)
    
    fix https://github.com/apache/pulsar/issues/12727
    Now transaction system topic can be created.
    
    we should not allow broker or user create by transaction system format topic.
    
    1. checkout topic auto create.
    2. admin create topic.
    
    add some test for it
    
    (cherry picked from commit 2c4d913c4b3fb1c6d924efaa0a24c93a2d2de7d0)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 14 +++--
 .../broker/admin/impl/PersistentTopicsBase.java    | 17 ++++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  2 +
 .../pulsar/broker/service/BrokerService.java       |  9 +++
 .../pulsar/broker/transaction/TransactionTest.java | 73 ++++++++++++++++++++++
 .../pulsar/common/events/EventsTopicNames.java     |  3 +-
 .../org/apache/pulsar/common/naming/TopicName.java |  3 +
 7 files changed, 113 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 14f5003..c897f15 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
-import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -132,7 +132,6 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -1648,11 +1647,16 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     }
 
 
-    private static boolean isTransactionSystemTopic(TopicName topicName) {
+    public static boolean isTransactionSystemTopic(TopicName topicName) {
         String topic = topicName.toString();
         return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
-                || topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
-                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
+                || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
+                || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
+    }
+
+    public static boolean isTransactionInternalName(TopicName topicName) {
+        String topic = topicName.toString();
+        return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
                 || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a6d8f2c..f223869 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
 import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -160,7 +161,9 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         try {
-            return topicResources().listPersistentTopicsAsync(namespaceName).join();
+            return topicResources().listPersistentTopicsAsync(namespaceName).thenApply(topics ->
+                    topics.stream().filter(topic ->
+                            !isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList())).join();
         } catch (Exception e) {
             log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e);
             throw new RestException(e);
@@ -243,6 +246,13 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected void validateCreateTopic(TopicName topicName) {
+        if (isTransactionInternalName(topicName)) {
+            log.warn("Try to create a topic in the system topic format! {}", topicName);
+            throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!");
+        }
+    }
+
     public void validateAdminOperationOnTopic(boolean authoritative) {
         validateAdminAccessForTenant(topicName.getTenant());
         validateTopicOwnership(topicName, authoritative);
@@ -3598,7 +3608,10 @@ public class PersistentTopicsBase extends AdminResource {
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
-            throw new RestException(e);
+            if (e.getCause() instanceof NotAllowedException) {
+                throw new RestException(Status.CONFLICT, e.getCause());
+            }
+            throw new RestException(e.getCause());
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 09b694c..1aeaccf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -234,6 +234,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             validateGlobalNamespaceOwnership();
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
             validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
+            validateCreateTopic(topicName);
             internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
@@ -267,6 +268,7 @@ public class PersistentTopics extends PersistentTopicsBase {
         validateNamespaceName(tenant, namespace);
         validateGlobalNamespaceOwnership();
         validateTopicName(tenant, namespace, encodedTopic);
+        validateCreateTopic(topicName);
         internalCreateNonPartitionedTopic(authoritative);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 456af1f..e592336 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -1226,6 +1227,14 @@ public class BrokerService implements Closeable {
             return;
         }
 
+        if (isTransactionSystemTopic(topicName)) {
+            String msg = String.format("Can not create transaction system topic %s", topic);
+            log.warn(msg);
+            pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+            topicFuture.completeExceptionally(new NotAllowedException(msg));
+            return;
+        }
+
         CompletableFuture<Void> maxTopicsCheck = createIfMissing
                 ? checkMaxTopicsPerNamespace(topicName, 1)
                 : CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e14c777..67da2d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -19,11 +19,27 @@
 package org.apache.pulsar.broker.transaction;
 
 import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
+import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -106,6 +122,63 @@ public class TransactionTest extends TransactionTestBase {
     }
 
     @Test
+    public void testCreateTransactionSystemTopic() throws Exception {
+        String subName = "test";
+        String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
+
+        try {
+            // init pending ack
+            @Cleanup
+            Consumer<byte[]> consumer = getConsumer(topicName, subName);
+            Transaction transaction = pulsarClient.newTransaction()
+                    .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+
+            consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+        }
+        topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);
+
+        // getList does not include transaction system topic
+        List<String> list = admin.topics().getList(NAMESPACE1);
+        assertEquals(list.size(), 4);
+        list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
+
+        try {
+            // can't create transaction system topic
+            @Cleanup
+            Consumer<byte[]> consumer = getConsumer(topicName, subName);
+            fail();
+        } catch (PulsarClientException.NotAllowedException e) {
+            assertTrue(e.getMessage().contains("Can not create transaction system topic"));
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().getSubscriptions(topicName);
+            fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().createPartitionedTopic(topicName, 3);
+            fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+        }
+
+        // can't create transaction system topic
+        try {
+            admin.topics().createNonPartitionedTopic(topicName);
+            fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+        }
+    }
+
+    @Test
     public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
         String subName = "test";
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
index 2aa9e12..f82c9ae 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
@@ -49,6 +49,7 @@ public class EventsTopicNames {
     }
 
     public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicName) {
-        return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString().equals(topicName.toString());
+        return topicName != null && topicName.toString()
+                .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 2cb9f23..3729697 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -63,6 +63,9 @@ public class TopicName implements ServiceUnitId {
     public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
             NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
 
+    public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
+
     public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
         String name = domain + "://" + namespaceName.toString() + '/' + topic;
         return TopicName.get(name);