You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/13 03:09:07 UTC

[pulsar] branch branch-2.11 updated: [fix][broker] Fix potential can't remove producer/reader reference (#17559)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 862bf0b8c2e [fix][broker] Fix potential can't remove producer/reader reference (#17559)
862bf0b8c2e is described below

commit 862bf0b8c2e9449407ba16aaa6ef42f07f4a3627
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Sep 13 11:04:07 2022 +0800

    [fix][broker] Fix potential can't remove producer/reader reference (#17559)
---
 .../broker/systopic/SystemTopicClientBase.java     |  8 ++--
 .../systopic/TopicPoliciesSystemTopicClient.java   | 48 +++++++++++++++-------
 2 files changed, 37 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java
index 65dfa0e19af..cb45077341d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java
@@ -56,9 +56,9 @@ public abstract class SystemTopicClientBase<T> implements SystemTopicClient<T> {
 
     @Override
     public CompletableFuture<Reader<T>> newReaderAsync() {
-        return newReaderAsyncInternal().thenCompose(reader -> {
+        return newReaderAsyncInternal().thenApply(reader -> {
             readers.add(reader);
-            return CompletableFuture.completedFuture(reader);
+            return reader;
         });
     }
 
@@ -73,9 +73,9 @@ public abstract class SystemTopicClientBase<T> implements SystemTopicClient<T> {
 
     @Override
     public CompletableFuture<Writer<T>> newWriterAsync() {
-        return newWriterAsyncInternal().thenCompose(writer -> {
+        return newWriterAsyncInternal().thenApply(writer -> {
             writers.add(writer);
-            return CompletableFuture.completedFuture(writer);
+            return writer;
         });
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index 84b8bd636bd..4ad2137d80b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.systopic;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -48,12 +50,12 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
         return client.newProducer(Schema.AVRO(PulsarEvent.class))
                 .topic(topicName.toString())
                 .enableBatching(false)
-                .createAsync().thenCompose(producer -> {
+                .createAsync()
+                .thenApply(producer -> {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] A new writer is created", topicName);
                     }
-                    return CompletableFuture.completedFuture(new TopicPolicyWriter(producer,
-                            TopicPoliciesSystemTopicClient.this));
+                    return new TopicPolicyWriter(producer, TopicPoliciesSystemTopicClient.this);
                 });
     }
 
@@ -62,13 +64,13 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
         return client.newReader(Schema.AVRO(PulsarEvent.class))
                 .topic(topicName.toString())
                 .startMessageId(MessageId.earliest)
-                .readCompacted(true).createAsync()
-                .thenCompose(reader -> {
+                .readCompacted(true)
+                .createAsync()
+                .thenApply(reader -> {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] A new reader is created", topicName);
                     }
-                    return CompletableFuture.completedFuture(new TopicPolicyReader(reader,
-                            TopicPoliciesSystemTopicClient.this));
+                    return new TopicPolicyReader(reader, TopicPoliciesSystemTopicClient.this);
                 });
     }
 
@@ -121,15 +123,23 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
 
         @Override
         public void close() throws IOException {
-            this.producer.close();
-            systemTopicClient.getWriters().remove(TopicPolicyWriter.this);
+            try {
+                closeAsync().get();
+            } catch (ExecutionException e) {
+                if (e.getCause() instanceof IOException) {
+                    throw (IOException) e.getCause();
+                } else {
+                    throw new PulsarServerException(e.getCause());
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
         }
 
         @Override
         public CompletableFuture<Void> closeAsync() {
-            return producer.closeAsync().thenCompose(v -> {
+            return producer.closeAsync().whenComplete((r, ex) -> {
                 systemTopicClient.getWriters().remove(TopicPolicyWriter.this);
-                return CompletableFuture.completedFuture(null);
             });
         }
 
@@ -184,15 +194,23 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
 
         @Override
         public void close() throws IOException {
-            this.reader.close();
-            systemTopic.getReaders().remove(TopicPolicyReader.this);
+            try {
+                closeAsync().get();
+            } catch (ExecutionException e) {
+                if (e.getCause() instanceof IOException) {
+                    throw (IOException) e.getCause();
+                } else {
+                    throw new PulsarServerException(e.getCause());
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
         }
 
         @Override
         public CompletableFuture<Void> closeAsync() {
-            return reader.closeAsync().thenCompose(v -> {
+            return reader.closeAsync().whenComplete((r, ex) -> {
                 systemTopic.getReaders().remove(TopicPolicyReader.this);
-                return CompletableFuture.completedFuture(null);
             });
         }