You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/19 09:59:08 UTC

[GitHub] [pulsar] TakaHiro0208 opened a new pull request, #15217: [pulsar-broker] Fix update topic partitions failed

TakaHiro0208 opened a new pull request, #15217:
URL: https://github.com/apache/pulsar/pull/15217

   
   
   ### Motivation
   I encounter the same problem as #11682 described, and i notice two pr for it, #10374 and #11683
   
   But I think they still remain problems.
   
   Actually internalUpdatePartitionedTopic consist of three part:
   1. tryCreatePartitionsAsync(): create the topic newPartition on metadataStore such as zk
   2. createSubscriptions(): creates subscriptions for new partitions of existing partitioned-topics
   3. updatePartitionedTopicAsync(): update the partitioned-topic metadata(update partition num to newPartition)
   
   Suppose there are subscriptions in a partitioned-topic and we want to update its partition.
   
   If part 1, 2 succeed, part 3 failed, it need to clean up managed-ledger znode. But it would throw zookeeper Directory not empty exception when clean-up, since znode would has successfully created subscription children node. 
   
   ![wecom-temp-977f0ae410a236f38d7822fc12d653b6](https://user-images.githubusercontent.com/13505225/163978509-150292e3-8ff8-4ccf-94e7-6bd465b479d8.png)
   
   
   And when we retrying updatePartition again, it would throw the below error because part 2 complete 
   ![wecom-temp-96b53fea557f4984494a8d213872f1d3](https://user-images.githubusercontent.com/13505225/163978565-27750a2b-5fe8-4fa0-a30e-dd58296ad60e.png)
   
   
   So we need to retry updatePartition again with "force=true". The "Subscription already exists" error would be catch, and it complete the updatePartition operation, while permanently skip the part 3.
   
   
   ### Modifications
   
   After catch the "Subscription already exists" error, do updatePartitionedTopicAsync() operation
   
   ### discussions
   
   When adding 'force=true' , the managed-ledger znode clean up operation seems not neccessary and can be removed?
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API: no
     - The schema: no
     - The default values of configurations: no
     - The wire protocol: no
     - The rest endpoints: no
     - The admin cli options: no
     - Anything that affects deployment: no
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x ] `no-need-doc` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-added`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaozhangmin commented on a diff in pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on code in PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#discussion_r853682072


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4052,7 +4052,26 @@ private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int
             return future;
         }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
             if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) {
-                result.complete(null);
+                CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
+                        .updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions));
+                future.thenAccept(__ -> result.complete(null)).exceptionally(ex2 -> {
+                    // If the update operation fails, clean up the partitions that were created
+                    getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
+                        int oldPartition = metadata.partitions;
+                        for (int i = oldPartition; i < numPartitions; i++) {
+                            topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> {
+                                log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName,
+                                        ex1.getCause());
+                                return null;
+                            });
+                        }
+                    }).exceptionally(e -> {
+                        log.warn("[{}] Failed to clean up managedLedger", topicName, e);
+                        return null;
+                    });
+                    result.completeExceptionally(ex2);
+                    return null;
+                });

Review Comment:
   @TakaHiro0208 If `createSubscriptions` failed, there would be a situation which sub Znode under managed-ledgers had created. your deletion here will fail also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaozhangmin commented on a diff in pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on code in PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#discussion_r853682072


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4052,7 +4052,26 @@ private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int
             return future;
         }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
             if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) {
-                result.complete(null);
+                CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
+                        .updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions));
+                future.thenAccept(__ -> result.complete(null)).exceptionally(ex2 -> {
+                    // If the update operation fails, clean up the partitions that were created
+                    getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
+                        int oldPartition = metadata.partitions;
+                        for (int i = oldPartition; i < numPartitions; i++) {
+                            topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> {
+                                log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName,
+                                        ex1.getCause());
+                                return null;
+                            });
+                        }
+                    }).exceptionally(e -> {
+                        log.warn("[{}] Failed to clean up managedLedger", topicName, e);
+                        return null;
+                    });
+                    result.completeExceptionally(ex2);
+                    return null;
+                });

Review Comment:
   @TakaHiro0208 If `createSubscriptions` failed, there would be a situation which sub Znode under managed-ledgers had created. your deletion here will be failed also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] TakaHiro0208 commented on pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
TakaHiro0208 commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1102572024

   > @TakaHiro0208 I think the proper way is, `createSubscriptions` should be called after `updatePartitionedTopicAsync` successfully.
   
   Yep, this can also solve the problem and remove redundant code, I can revise the committed code following this. But, there would be a situation: after successful  `updatePartitionedTopicAsync` and failed `createSubscriptions`, we would still get newPartitions by getPartitionedTopicMetadata.  Does it matter??


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaozhangmin commented on pull request #15217: [pulsar-broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1102518854

   @TakaHiro0208 I think the proper way is, we should call `updatePartitionedTopicAsync` before `createSubscriptions` on new added partitions. The previous order is not right.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] TakaHiro0208 commented on pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
TakaHiro0208 commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1104818621

   I have updated the code, PTAL, @gaozhangmin @Technoboy- @rdhabalia
   1.  Maybe it is not good to call createSubscriptions after updatePartitionedTopicAsync, because we finally use getPartitionedTopicMetadata to judge whether update partition successful
   2. managed-ledger znode clean-up is actually not effective and not neccessary, since we can retry updatePartition by "force=true"
   3. It is better to add doc, illustrating that if updatePartition partially successful, we can use "force=true" to retry updatePartition again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1133802296

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] TakaHiro0208 commented on pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
TakaHiro0208 commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1102572613

   > > @TakaHiro0208 I think the proper way is, `createSubscriptions` should be called after `updatePartitionedTopicAsync` successfully.
   > 
   > @gaozhangmin Thank you for your review. Yep, this can also solve the problem and remove redundant code, I can revise the committed code following this. But, there would be a situation: after successful `updatePartitionedTopicAsync` and failed `createSubscriptions`, we would still get newPartitions by getPartitionedTopicMetadata. Does it matter??
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] TakaHiro0208 commented on a diff in pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
TakaHiro0208 commented on code in PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#discussion_r853886528


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4052,7 +4052,26 @@ private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int
             return future;
         }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
             if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) {
-                result.complete(null);
+                CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
+                        .updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions));
+                future.thenAccept(__ -> result.complete(null)).exceptionally(ex2 -> {
+                    // If the update operation fails, clean up the partitions that were created
+                    getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
+                        int oldPartition = metadata.partitions;
+                        for (int i = oldPartition; i < numPartitions; i++) {
+                            topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> {
+                                log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName,
+                                        ex1.getCause());
+                                return null;
+                            });
+                        }
+                    }).exceptionally(e -> {
+                        log.warn("[{}] Failed to clean up managedLedger", topicName, e);
+                        return null;
+                    });
+                    result.completeExceptionally(ex2);
+                    return null;
+                });

Review Comment:
   > @TakaHiro0208 If `createSubscriptions` failed, there would be a situation which sub Znode under managed-ledgers had created. your deletion here will fail also.
   
   If we use "force=true" to retry updatePartition, it seems no need to clean-up znode ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15217: [pulsar-broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1102512985

   @TakaHiro0208:Thanks for providing doc info!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
Jason918 commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1229148530

   @TakaHiR07 Please rebase the master and resolve the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15217: [pulsar-broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1102512761

   @TakaHiro0208:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#discussion_r852987782


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4052,7 +4052,26 @@ private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int
             return future;
         }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
             if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) {
-                result.complete(null);
+                CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
+                        .updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions));
+                future.thenAccept(__ -> result.complete(null)).exceptionally(ex2 -> {
+                    // If the update operation fails, clean up the partitions that were created
+                    getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
+                        int oldPartition = metadata.partitions;
+                        for (int i = oldPartition; i < numPartitions; i++) {
+                            topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> {
+                                log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName,
+                                        ex1.getCause());
+                                return null;
+                            });
+                        }
+                    }).exceptionally(e -> {
+                        log.warn("[{}] Failed to clean up managedLedger", topicName, e);
+                        return null;
+                    });
+                    result.completeExceptionally(ex2);
+                    return null;
+                });

Review Comment:
   The `updatePartitionedTopicAsync` has handled the exception, so it can't delegate the exception here.
   BTW, could you help refactor this method ?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1179639529

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] TakaHiro0208 commented on a diff in pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
TakaHiro0208 commented on code in PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#discussion_r853017089


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4052,7 +4052,26 @@ private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int
             return future;
         }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
             if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) {
-                result.complete(null);
+                CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
+                        .updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions));
+                future.thenAccept(__ -> result.complete(null)).exceptionally(ex2 -> {
+                    // If the update operation fails, clean up the partitions that were created
+                    getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
+                        int oldPartition = metadata.partitions;
+                        for (int i = oldPartition; i < numPartitions; i++) {
+                            topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> {
+                                log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName,
+                                        ex1.getCause());
+                                return null;
+                            });
+                        }
+                    }).exceptionally(e -> {
+                        log.warn("[{}] Failed to clean up managedLedger", topicName, e);
+                        return null;
+                    });
+                    result.completeExceptionally(ex2);
+                    return null;
+                });

Review Comment:
   I think it do not handle since the code between 4033-4035 also delegate the exception. And I think I can help refactor this method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] TakaHiR07 closed pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
TakaHiR07 closed pull request #15217: [fix][broker] Fix update topic partitions failed
URL: https://github.com/apache/pulsar/pull/15217


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] TakaHiR07 commented on pull request #15217: [fix][broker] Fix update topic partitions failed

Posted by GitBox <gi...@apache.org>.
TakaHiR07 commented on PR #15217:
URL: https://github.com/apache/pulsar/pull/15217#issuecomment-1232847520

   pr-17251 is the similar method to solve update partition problem. So close this one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org