You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/09 12:38:26 UTC
[pulsar] branch branch-2.11 updated: [cherry-pick][branch-2.11] cherry-pick fixing can not delete namespace by force (#18307) (#18826)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 8138553cefb [cherry-pick][branch-2.11] cherry-pick fixing can not delete namespace by force (#18307) (#18826)
8138553cefb is described below
commit 8138553cefbcab3614b2baa24241cb95388e125b
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Dec 9 20:38:17 2022 +0800
[cherry-pick][branch-2.11] cherry-pick fixing can not delete namespace by force (#18307) (#18826)
### Motivation
Cherry-pick (#18307) to release 2.11.1.
### Modifications
Cherry-pick (#18307) to release 2.11.1.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 84 +++++++++++---
.../broker/service/persistent/PersistentTopic.java | 128 +++++++++++----------
.../apache/pulsar/broker/admin/NamespacesTest.java | 81 +++++++++++--
.../broker/transaction/TransactionProduceTest.java | 29 -----
.../pulsar/broker/transaction/TransactionTest.java | 5 +-
5 files changed, 212 insertions(+), 115 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4d8f49be965..60d171a9819 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -50,6 +50,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
@@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -471,16 +473,26 @@ public abstract class NamespacesBase extends AdminResource {
if (!topics.isEmpty()) {
Set<String> partitionedTopics = new HashSet<>();
Set<String> nonPartitionedTopics = new HashSet<>();
+ Set<String> allSystemTopics = new HashSet<>();
+ Set<String> allPartitionedSystemTopics = new HashSet<>();
for (String topic : topics) {
try {
TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
+ if (pulsar().getBrokerService().isSystemTopic(topicName)) {
+ allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+ continue;
+ }
String partitionedTopic = topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic)) {
partitionedTopics.add(partitionedTopic);
}
} else {
+ if (pulsar().getBrokerService().isSystemTopic(topicName)) {
+ allSystemTopics.add(topic);
+ continue;
+ }
nonPartitionedTopics.add(topic);
}
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
@@ -508,21 +520,24 @@ public abstract class NamespacesBase extends AdminResource {
}
final CompletableFuture<Throwable> topicFutureEx =
- FutureUtil.waitForAll(topicFutures).handle((result, exception) -> {
- if (exception != null) {
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse
- .resume(new RestException((PulsarAdminException) exception.getCause()));
- } else {
- log.error("[{}] Failed to remove forcefully owned namespace {}",
- clientAppId(), namespaceName, exception);
- asyncResponse.resume(new RestException(exception.getCause()));
- }
- return exception;
- }
-
- return null;
- });
+ FutureUtil.waitForAll(topicFutures)
+ .thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics))
+ .thenCompose((ignore) ->
+ internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .handle((result, exception) -> {
+ if (exception != null) {
+ if (exception.getCause() instanceof PulsarAdminException) {
+ asyncResponse.resume(
+ new RestException((PulsarAdminException) exception.getCause()));
+ } else {
+ log.error("[{}] Failed to remove forcefully owned namespace {}",
+ clientAppId(), namespaceName, exception);
+ asyncResponse.resume(new RestException(exception.getCause()));
+ }
+ return exception;
+ }
+ return null;
+ });
if (topicFutureEx.join() != null) {
return;
}
@@ -564,6 +579,45 @@ public abstract class NamespacesBase extends AdminResource {
});
}
+ private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+ log.info("internalDeletePartitionedTopicsAsync");
+ if (CollectionUtils.isEmpty(topicNames)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (Exception ex) {
+ log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String topicName : topicNames) {
+ TopicName tn = TopicName.get(topicName);
+ futures.add(admin.topics().deletePartitionedTopicAsync(topicName, true, true));
+ }
+ return FutureUtil.waitForAll(futures);
+ }
+ private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
+ log.info("internalDeleteTopicsAsync");
+ if (CollectionUtils.isEmpty(topicNames)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (Exception ex) {
+ log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String topicName : topicNames) {
+ futures.add(admin.topics().deleteAsync(topicName, true, true));
+ }
+ return FutureUtil.waitForAll(futures);
+ }
+
+
@SuppressWarnings("deprecation")
protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
boolean force) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1d1160d7dfb..5ad6891d30c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1178,68 +1178,74 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return null;
});
- closeClientFuture.thenAccept(delete -> {
- CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
- brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
- deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema())
- .thenCompose(__ -> deleteTopicPolicies())
- .thenCompose(__ -> transactionBufferCleanupAndClose())
- .whenComplete((v, ex) -> {
- if (ex != null) {
- log.error("[{}] Error deleting topic", topic, ex);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(ex);
- } else {
- List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
- subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));
-
- FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
- if (e != null) {
- log.error("[{}] Error deleting topic", topic, e);
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(e);
- } else {
- ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- brokerService.removeTopicFromCache(PersistentTopic.this);
-
- dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
- subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
- unregisterTopicPolicyListener();
-
- log.info("[{}] Topic deleted", topic);
- deleteFuture.complete(null);
- }
-
- @Override
- public void deleteLedgerFailed(ManagedLedgerException exception,
- Object ctx) {
- if (exception.getCause()
- instanceof MetadataStoreException.NotFoundException) {
- log.info("[{}] Topic is already deleted {}",
- topic, exception.getMessage());
- deleteLedgerComplete(ctx);
- } else {
- unfenceTopicToResume();
- log.error("[{}] Error deleting topic", topic, exception);
- deleteFuture.completeExceptionally(
- new PersistenceException(exception));
+ closeClientFuture.thenAccept(__ -> {
+ CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
+ brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
+ deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
+ .thenCompose(ignore -> {
+ if (!this.getBrokerService().getPulsar().getBrokerService()
+ .isSystemTopic(TopicName.get(topic))) {
+ return deleteTopicPolicies();
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ })
+ .thenCompose(ignore -> transactionBufferCleanupAndClose())
+ .whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Error deleting topic", topic, ex);
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(ex);
+ } else {
+ List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
+ subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));
+ FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+ if (e != null) {
+ log.error("[{}] Error deleting topic", topic, e);
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(e);
+ } else {
+ ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ brokerService.removeTopicFromCache(PersistentTopic.this);
+
+ dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+ subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+ unregisterTopicPolicyListener();
+
+ log.info("[{}] Topic deleted", topic);
+ deleteFuture.complete(null);
}
- }
- }, null);
- }
- });
- }
- });
- }).exceptionally(ex->{
- unfenceTopicToResume();
- deleteFuture.completeExceptionally(
- new TopicBusyException("Failed to close clients before deleting topic."));
- return null;
- });
+
+ @Override
+ public void deleteLedgerFailed(ManagedLedgerException exception,
+ Object ctx) {
+ if (exception.getCause()
+ instanceof MetadataStoreException.NotFoundException) {
+ log.info("[{}] Topic is already deleted {}",
+ topic, exception.getMessage());
+ deleteLedgerComplete(ctx);
+ } else {
+ unfenceTopicToResume();
+ log.error("[{}] Error deleting topic", topic, exception);
+ deleteFuture.completeExceptionally(
+ new PersistenceException(exception));
+ }
+ }
+ }, null);
+ }
+ });
+ }
+ });
+ }).exceptionally(ex->{
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(
+ new TopicBusyException("Failed to close clients before deleting topic."));
+ return null;
+ });
} finally {
lock.writeLock().unlock();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 93ac82ade44..dc4df8330cc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -70,6 +70,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
@@ -85,6 +86,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
@@ -111,6 +114,7 @@ import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -1389,8 +1393,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
- assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
- new Long(-1));
+ assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadThresholdInBytes(), -1L), 0);
// set an override for the namespace
admin.namespaces().setOffloadThreshold(namespace, 100);
@@ -1406,8 +1410,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
- assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
- new Long(100));
+ assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadThresholdInBytes(), 100L), 0);
// set another negative value to disable
admin.namespaces().setOffloadThreshold(namespace, -2);
@@ -1422,8 +1426,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
- assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
- new Long(-2));
+ assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadThresholdInBytes(), -2L), 0);
// set back to -1 and fall back to default
admin.namespaces().setOffloadThreshold(namespace, -1);
@@ -1438,8 +1442,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
- assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
- new Long(-1));
+ assertEquals(Long.compare(ledgerConf.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadThresholdInBytes(), -1L), 0);
// cleanup
admin.topics().delete(topicName.toString(), true);
@@ -1881,4 +1885,65 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws Exception {
+ String namespace = this.testTenant + "/delete-namespace";
+ String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-namespace",
+ "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+ Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. change the order of the topics in this namespace.
+ List<String> topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+ Assert.assertTrue(topics.size() >= 2);
+ for (int i = 0; i < topics.size(); i++) {
+ if (topics.get(i).contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+ String systemTopic = topics.get(i);
+ topics.set(i, topics.get(0));
+ topics.set(0, systemTopic);
+ }
+ }
+ NamespaceService mockNamespaceService = spy(pulsar.getNamespaceService());
+ Field namespaceServiceField = pulsar.getClass().getDeclaredField("nsService");
+ namespaceServiceField.setAccessible(true);
+ namespaceServiceField.set(pulsar, mockNamespaceService);
+ doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+ // 5. delete the namespace
+ admin.namespaces().deleteNamespace(namespace, true);
+
+ }
+
+ @Test
+ public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
+ String namespace = this.testTenant + "/delete-systemTopic";
+ String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
+ "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. delete the policies topic and the topic wil not to clear topic polices
+ admin.topics().delete(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index d43221a64e2..0d1bbda4568 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends TransactionTestBase {
produceTest(true);
}
- @Test
- public void testDeleteNamespaceBeforeCommit() throws Exception {
- final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
- PulsarClient pulsarClient = this.pulsarClient;
- Transaction tnx = pulsarClient.newTransaction()
- .withTransactionTimeout(60, TimeUnit.SECONDS)
- .build().get();
- long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
- long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
- Assert.assertTrue(txnIdMostBits > -1);
- Assert.assertTrue(txnIdLeastBits > -1);
-
- @Cleanup
- Producer<byte[]> outProducer = pulsarClient
- .newProducer()
- .topic(topic)
- .sendTimeout(0, TimeUnit.SECONDS)
- .enableBatching(false)
- .create();
-
- String content = "Hello Txn";
- outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
- try {
- deleteNamespaceGraceFully(NAMESPACE1, true);
- } catch (Exception ignore) {}
- tnx.commit().get();
- }
-
@Test
public void produceAndAbortTest() throws Exception {
produceTest(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 307244a6447..f0417575446 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -166,7 +166,8 @@ public class TransactionTest extends TransactionTestBase {
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
-
+ admin.namespaces().deleteNamespace(NAMESPACE1, true);
+ admin.namespaces().createNamespace(NAMESPACE1);
try {
// init pending ack
@Cleanup
@@ -182,7 +183,7 @@ public class TransactionTest extends TransactionTestBase {
// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
- assertEquals(list.size(), 2);
+ assertFalse(list.isEmpty());
list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));
try {