You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/11/13 01:16:59 UTC
[pulsar] branch master updated: [Transaction] Fix transaction
system topic create in loop. (#12749)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2c4d913c [Transaction] Fix transaction system topic create in loop. (#12749)
2c4d913c is described below
commit 2c4d913c4b3fb1c6d924efaa0a24c93a2d2de7d0
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Nov 13 09:15:30 2021 +0800
[Transaction] Fix transaction system topic create in loop. (#12749)
fix https://github.com/apache/pulsar/issues/12727
### Motivation
Now transaction system topic can be created.
### Modifications
we should not allow broker or user create by transaction system format topic.
1. checkout topic auto create.
2. admin create topic.
### Verifying this change
add some test for it
---
.../org/apache/pulsar/broker/PulsarService.java | 14 +++--
.../broker/admin/impl/PersistentTopicsBase.java | 17 +++++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 2 +
.../pulsar/broker/service/BrokerService.java | 9 +++
.../pulsar/broker/transaction/TransactionTest.java | 64 ++++++++++++++++++++++
.../pulsar/common/events/EventsTopicNames.java | 3 +-
.../org/apache/pulsar/common/naming/TopicName.java | 3 +
7 files changed, 104 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index bca21ae..28ccbb1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
-import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -132,7 +132,6 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -1649,11 +1648,16 @@ public class PulsarService implements AutoCloseable, ShutdownService {
}
- private static boolean isTransactionSystemTopic(TopicName topicName) {
+ public static boolean isTransactionSystemTopic(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
- || topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
- NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
+ || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
+ || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
+ }
+
+ public static boolean isTransactionInternalName(TopicName topicName) {
+ String topic = topicName.toString();
+ return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ef4510c..fc33ad4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -161,7 +162,9 @@ public class PersistentTopicsBase extends AdminResource {
}
try {
- return topicResources().listPersistentTopicsAsync(namespaceName).join();
+ return topicResources().listPersistentTopicsAsync(namespaceName).thenApply(topics ->
+ topics.stream().filter(topic ->
+ !isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList())).join();
} catch (Exception e) {
log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
@@ -244,6 +247,13 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ protected void validateCreateTopic(TopicName topicName) {
+ if (isTransactionInternalName(topicName)) {
+ log.warn("Try to create a topic in the system topic format! {}", topicName);
+ throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!");
+ }
+ }
+
public void validateAdminOperationOnTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
@@ -3683,7 +3693,10 @@ public class PersistentTopicsBase extends AdminResource {
} catch (RestException e) {
throw e;
} catch (Exception e) {
- throw new RestException(e);
+ if (e.getCause() instanceof NotAllowedException) {
+ throw new RestException(Status.CONFLICT, e.getCause());
+ }
+ throw new RestException(e.getCause());
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1318057..bf6c647 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -234,6 +234,7 @@ public class PersistentTopics extends PersistentTopicsBase {
validateGlobalNamespaceOwnership();
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
+ validateCreateTopic(topicName);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
@@ -267,6 +268,7 @@ public class PersistentTopics extends PersistentTopicsBase {
validateNamespaceName(tenant, namespace);
validateGlobalNamespaceOwnership();
validateTopicName(tenant, namespace, encodedTopic);
+ validateCreateTopic(topicName);
internalCreateNonPartitionedTopic(authoritative);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9194820..ba3a74d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -1284,6 +1285,14 @@ public class BrokerService implements Closeable {
return;
}
+ if (isTransactionSystemTopic(topicName)) {
+ String msg = String.format("Can not create transaction system topic %s", topic);
+ log.warn(msg);
+ pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+ topicFuture.completeExceptionally(new NotAllowedException(msg));
+ return;
+ }
+
CompletableFuture<Void> maxTopicsCheck = createIfMissing
? checkMaxTopicsPerNamespace(topicName, 1)
: CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 930ab92..df527da 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -26,13 +27,19 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
import com.google.common.collect.Sets;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -102,6 +109,63 @@ public class TransactionTest extends TransactionTestBase {
}
@Test
+ public void testCreateTransactionSystemTopic() throws Exception {
+ String subName = "test";
+ String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
+
+ try {
+ // init pending ack
+ @Cleanup
+ Consumer<byte[]> consumer = getConsumer(topicName, subName);
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+
+ consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+ }
+ topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);
+
+ // getList does not include transaction system topic
+ List<String> list = admin.topics().getList(NAMESPACE1);
+ assertEquals(list.size(), 4);
+ list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
+
+ try {
+ // can't create transaction system topic
+ @Cleanup
+ Consumer<byte[]> consumer = getConsumer(topicName, subName);
+ fail();
+ } catch (PulsarClientException.NotAllowedException e) {
+ assertTrue(e.getMessage().contains("Can not create transaction system topic"));
+ }
+
+ // can't create transaction system topic
+ try {
+ admin.topics().getSubscriptions(topicName);
+ fail();
+ } catch (PulsarAdminException.ConflictException e) {
+ assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
+ }
+
+ // can't create transaction system topic
+ try {
+ admin.topics().createPartitionedTopic(topicName, 3);
+ fail();
+ } catch (PulsarAdminException.ConflictException e) {
+ assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+ }
+
+ // can't create transaction system topic
+ try {
+ admin.topics().createNonPartitionedTopic(topicName);
+ fail();
+ } catch (PulsarAdminException.ConflictException e) {
+ assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+ }
+ }
+
+ @Test
public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
String subName = "test";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
index 2aa9e12..f82c9ae 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
@@ -49,6 +49,7 @@ public class EventsTopicNames {
}
public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicName) {
- return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString().equals(topicName.toString());
+ return topicName != null && topicName.toString()
+ .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 257d27c..efbd8c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -63,6 +63,9 @@ public class TopicName implements ServiceUnitId {
public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
+ public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
+ NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
+
public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
return TopicName.get(name);