You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/09/05 23:02:41 UTC

[pulsar] branch master updated: Improve batch source intermediate topic cleanup (#7985)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d45ee07  Improve batch source intermediate topic cleanup (#7985)
d45ee07 is described below

commit d45ee07fe2bac6fe03d62895b497bfee0807bb8d
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Sep 5 16:02:16 2020 -0700

    Improve batch source intermediate topic cleanup (#7985)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../java/org/apache/pulsar/functions/worker/FunctionActioner.java | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index a79138d..c75fb3a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -422,7 +422,7 @@ public class FunctionActioner {
     private Supplier<Actions.ActionResult> getDeleteTopicSupplier(String topic) {
         return () -> {
             try {
-                pulsarAdmin.topics().delete(topic);
+                pulsarAdmin.topics().delete(topic, true);
             } catch (PulsarAdminException e) {
                 if (e instanceof PulsarAdminException.NotFoundException) {
                     return Actions.ActionResult.builder()
@@ -628,19 +628,21 @@ public class FunctionActioner {
             try {
                 Actions.newBuilder()
                   .addAction(
+                    // Unsubscribe and allow time for consumers to close
                     Actions.Action.builder()
                       .actionName(String.format("Removing intermediate topic subscription %s for Batch Source %s",
                         intermediateTopicSubscription, fqfn))
                       .numRetries(10)
                       .sleepBetweenInvocationsMs(1000)
-                      .continueOn(true)
                       .supplier(
                         getDeleteSubscriptionSupplier(intermediateTopicName,
                           false,
                           intermediateTopicSubscription)
                       )
                       .build())
-                  .addAction(Actions.Action.builder()
+                  .addAction(
+                    // Delete topic forcibly regardless whether unsubscribe succeeded or not
+                    Actions.Action.builder()
                     .actionName(String.format("Deleting intermediate topic %s for Batch Source %s",
                       intermediateTopicName, fqfn))
                     .numRetries(10)