You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/22 13:21:24 UTC
[pulsar] branch branch-2.8 updated: [Transaction] Fix transaction system topic create in loop (#12889)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 50727d7 [Transaction] Fix transaction system topic create in loop (#12889)
50727d7 is described below
commit 50727d73810d39b878495936324b85c988e4db9b
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Nov 22 21:20:07 2021 +0800
[Transaction] Fix transaction system topic create in loop (#12889)
---
.../org/apache/pulsar/broker/PulsarService.java | 14 ++--
.../broker/admin/impl/PersistentTopicsBase.java | 16 ++++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 2 +
.../pulsar/broker/service/BrokerService.java | 9 +++
.../pulsar/broker/transaction/TransactionTest.java | 78 ++++++++++++++++++++--
.../org/apache/pulsar/common/naming/TopicName.java | 3 +
6 files changed, 109 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ba5a90b..06430aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -135,7 +135,6 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -1673,11 +1672,16 @@ public class PulsarService implements AutoCloseable {
}
- private static boolean isTransactionSystemTopic(TopicName topicName) {
+ public static boolean isTransactionSystemTopic(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
- || topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
- NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
+ || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
+ || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
+ }
+
+ public static boolean isTransactionInternalName(TopicName topicName) {
+ String topic = topicName.toString();
+ return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1db53c0..86116d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.util.Codec.decode;
@@ -164,7 +165,8 @@ public class PersistentTopicsBase extends AdminResource {
try {
String path = String.format("/managed-ledgers/%s/%s", namespaceName.toString(), domain());
for (String topic : getLocalPolicies().getChildren(path)) {
- if (domain().equals(TopicDomain.persistent.toString())) {
+ if (domain().equals(TopicDomain.persistent.toString())
+ && !isTransactionInternalName(TopicName.get(topic))) {
topics.add(TopicName.get(domain(), namespaceName, decode(topic)).toString());
}
}
@@ -198,6 +200,13 @@ public class PersistentTopicsBase extends AdminResource {
return getPartitionedTopicList(TopicDomain.getEnum(domain()));
}
+ protected void validateCreateTopic(TopicName topicName) {
+ if (isTransactionInternalName(topicName)) {
+ log.warn("Try to create a topic in the system topic format! {}", topicName);
+ throw new RestException(Status.BAD_REQUEST, "Cannot create topic in system topic format!");
+ }
+ }
+
protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
@@ -3517,7 +3526,10 @@ public class PersistentTopicsBase extends AdminResource {
} catch (RestException e) {
throw e;
} catch (Exception e) {
- throw new RestException(e);
+ if (e.getCause() instanceof NotAllowedException) {
+ throw new RestException(Status.BAD_REQUEST, e.getCause());
+ }
+ throw new RestException(e.getCause());
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 2c54908..125e3f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -233,6 +233,7 @@ public class PersistentTopics extends PersistentTopicsBase {
validateGlobalNamespaceOwnership(tenant, namespace);
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
+ validateCreateTopic(topicName);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
@@ -265,6 +266,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateGlobalNamespaceOwnership(tenant, namespace);
validateTopicName(tenant, namespace, encodedTopic);
+ validateCreateTopic(topicName);
internalCreateNonPartitionedTopic(authoritative);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 62124fb..b49fb59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
@@ -1282,6 +1283,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return;
}
+ if (isTransactionSystemTopic(topicName)) {
+ String msg = String.format("Can not create transaction system topic %s", topic);
+ log.warn(msg);
+ pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+ topicFuture.completeExceptionally(new NotAllowedException(msg));
+ return;
+ }
+
if (createIfMissing && !checkMaxTopicsPerNamespace(topicName, 1, topicFuture)) {
return;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index fb94638..052700f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -18,12 +18,20 @@
*/
package org.apache.pulsar.broker.transaction;
+import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.FileAssert.fail;
+
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -44,6 +52,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@@ -164,14 +173,14 @@ public class TransactionTest extends TransactionTestBase {
Transaction transaction = pulsarClient.newTransaction()
.build().get();
TxnID txnID = transaction.getTxnID();
- Assert.assertEquals(txnID.getLeastSigBits(), 0);
- Assert.assertEquals(txnID.getMostSigBits(), 0);
+ assertEquals(txnID.getLeastSigBits(), 0);
+ assertEquals(txnID.getMostSigBits(), 0);
transaction.abort();
transaction = pulsarClient.newTransaction()
.build().get();
txnID = transaction.getTxnID();
- Assert.assertEquals(txnID.getLeastSigBits(), 1);
- Assert.assertEquals(txnID.getMostSigBits(), 0);
+ assertEquals(txnID.getLeastSigBits(), 1);
+ assertEquals(txnID.getMostSigBits(), 0);
}
@Test
@@ -228,12 +237,12 @@ public class TransactionTest extends TransactionTestBase {
Assert.fail();
}
TopicPolicies topicPolicies = originPersistentTopic.getTopicPolicies().get();
- Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize);
+ assertEquals(retentionSizeInMbSetTopic, retentionSize);
MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider();
CompletableFuture<PendingAckStore> future = mlPendingAckStoreProvider.newPendingAckStore(subscription);
future.thenAccept(pendingAckStore -> {
((MLPendingAckStore) pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> {
- Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
+ assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
retentionSizeInMbSetTo);
});
}
@@ -246,4 +255,61 @@ public class TransactionTest extends TransactionTestBase {
}
+ @Test
+ public void testCreateTransactionSystemTopic() throws Exception {
+ String subName = "test";
+ String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
+
+ try {
+ // init pending ack
+ @Cleanup
+ Consumer<byte[]> consumer = getConsumer(topicName, subName);
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+
+ consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+ }
+ topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);
+
+ // getList does not include transaction system topic
+ List<String> list = admin.topics().getList(NAMESPACE1);
+ assertEquals(list.size(), 3);
+ list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
+
+ try {
+ // can't create transaction system topic
+ @Cleanup
+ Consumer<byte[]> consumer = getConsumer(topicName, subName);
+ fail();
+ } catch (PulsarClientException.NotAllowedException e) {
+ assertTrue(e.getMessage().contains("Can not create transaction system topic"));
+ }
+
+ // can't create transaction system topic
+ try {
+ admin.topics().getSubscriptions(topicName);
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
+ }
+
+ // can't create transaction system topic
+ try {
+ admin.topics().createPartitionedTopic(topicName, 3);
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+ }
+
+ // can't create transaction system topic
+ try {
+ admin.topics().createNonPartitionedTopic(topicName);
+ fail();
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
+ }
+ }
+
}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index fd183df..b8537d2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -67,6 +67,9 @@ public class TopicName implements ServiceUnitId {
public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
+ public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
+ NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
+
public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
return TopicName.get(name);