You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/01 03:12:36 UTC
[pulsar] 09/10: [Broker] Fix error when "checkGC#tryToDeletePartitionedMetadata" is ignored (#14482)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0bbf05b82e7ab8d2dce28f5ef2116a335ee81e84
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Mon Feb 28 16:36:07 2022 +0800
[Broker] Fix error when "checkGC#tryToDeletePartitionedMetadata" is ignored (#14482)
(cherry picked from commit a9fe9efe549af40ff0a90b970064f17af72a786b)
---
.../service/nonpersistent/NonPersistentTopic.java | 22 +++++++++++-----------
.../broker/service/NonPersistentTopicE2ETest.java | 12 +++++++-----
2 files changed, 18 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 6c14edf..71f1764 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -916,7 +916,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
}
stopReplProducers().thenCompose(v -> delete(true, false, true))
- .thenAccept(__ -> tryToDeletePartitionedMetadata())
+ .thenCompose(__ -> tryToDeletePartitionedMetadata())
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
Throwable throwable = e.getCause();
@@ -943,16 +943,16 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
return CompletableFuture.completedFuture(null);
}
TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
- try {
- NamespaceResources.PartitionedTopicResources partitionedTopicResources = brokerService.pulsar()
- .getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
- if (!partitionedTopicResources.partitionedTopicExists(topicName)) {
- return CompletableFuture.completedFuture(null);
- }
- return partitionedTopicResources.deletePartitionedTopicAsync(topicName);
- } catch (Exception e) {
- return FutureUtil.failedFuture(e);
- }
+ NamespaceResources.PartitionedTopicResources partitionedTopicResources = brokerService.pulsar()
+ .getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
+ return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
+ .thenCompose(partitionedTopicExist -> {
+ if (!partitionedTopicExist) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return partitionedTopicResources.deletePartitionedTopicAsync(topicName);
+ }
+ });
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 83fad2b..0e598f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -37,11 +37,12 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -235,10 +236,11 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
admin.topics().createPartitionedTopic(topicGc, partitions);
Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create();
producer3.close();
- assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
- TopicName.get(topicGc)).join().partitions == partitions);
+ assertEquals(partitions, pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+ TopicName.get(topicGc)).join().partitions);
runGC();
- assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
- TopicName.get(topicGc)).join().partitions == 0);
+ Awaitility.await().untilAsserted(()->
+ assertEquals(pulsar.getBrokerService().
+ fetchPartitionedTopicMetadataAsync(TopicName.get(topicGc)).join().partitions, 0));
}
}