You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/19 05:48:34 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #11640: [Enhancement] avoid duplicate deletion of schema

codelipenghui commented on a change in pull request #11640:
URL: https://github.com/apache/pulsar/pull/11640#discussion_r691802592



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -415,14 +415,40 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
         try {
             // firstly remove all topics including system topics
             if (!topics.isEmpty()) {
+                Set<String> partitionedTopics = new HashSet<>();
+                Set<String> nonPartitionedTopics = new HashSet<>();
+
                 for (String topic : topics) {
                     try {
-                        futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                        TopicName topicName = TopicName.get(topic);
+                        if (topicName.isPartitioned()) {
+                            String partitionedTopic = topicName.getPartitionedTopicName();
+                            if (!partitionedTopics.contains(partitionedTopic)) {
+                                // Distinguish partitioned topic to avoid duplicate deletion of the same schema
+                                futures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
+                                        partitionedTopic, true, true));
+                                partitionedTopics.add(partitionedTopic);
+                            }
+                        } else {
+                            futures.add(pulsar().getAdminClient().topics().deleteAsync(
+                                    topic, true, true));
+                            nonPartitionedTopics.add(topic);
+                        }
                     } catch (Exception e) {
-                        log.error("[{}] Failed to force delete topic {}", clientAppId(), topic, e);
-                        asyncResponse.resume(new RestException(e));
+                        String errorMessage = String.format("Failed to force delete topic %s, "
+                                        + "but the previous deletion command of partitioned-topics:%s "
+                                        + "and non-partitioned-topics:%s have been sent out asynchronously. "
+                                        + "Reason: %s",
+                                topic, partitionedTopics, nonPartitionedTopics, e.getCause());
+                        log.error("[{}] {}", clientAppId(), errorMessage, e);
+                        asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, errorMessage));
+                        return;
                     }
                 }
+
+                log.info("Successfully send deletion command of partitioned-topics:{} "

Review comment:
       It should be a debug level log? If we have many partitioned topics and non-partitioned-topics, we will get a very long log here, maybe print the topic count is better?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org