You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/09/05 23:02:41 UTC
[pulsar] branch master updated: Improve batch source intermediate
topic cleanup (#7985)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d45ee07 Improve batch source intermediate topic cleanup (#7985)
d45ee07 is described below
commit d45ee07fe2bac6fe03d62895b497bfee0807bb8d
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Sep 5 16:02:16 2020 -0700
Improve batch source intermediate topic cleanup (#7985)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../java/org/apache/pulsar/functions/worker/FunctionActioner.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index a79138d..c75fb3a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -422,7 +422,7 @@ public class FunctionActioner {
private Supplier<Actions.ActionResult> getDeleteTopicSupplier(String topic) {
return () -> {
try {
- pulsarAdmin.topics().delete(topic);
+ pulsarAdmin.topics().delete(topic, true);
} catch (PulsarAdminException e) {
if (e instanceof PulsarAdminException.NotFoundException) {
return Actions.ActionResult.builder()
@@ -628,19 +628,21 @@ public class FunctionActioner {
try {
Actions.newBuilder()
.addAction(
+ // Unsubscribe and allow time for consumers to close
Actions.Action.builder()
.actionName(String.format("Removing intermediate topic subscription %s for Batch Source %s",
intermediateTopicSubscription, fqfn))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
- .continueOn(true)
.supplier(
getDeleteSubscriptionSupplier(intermediateTopicName,
false,
intermediateTopicSubscription)
)
.build())
- .addAction(Actions.Action.builder()
+ .addAction(
+ // Delete topic forcibly regardless whether unsubscribe succeeded or not
+ Actions.Action.builder()
.actionName(String.format("Deleting intermediate topic %s for Batch Source %s",
intermediateTopicName, fqfn))
.numRetries(10)