You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/13 03:09:07 UTC
[pulsar] branch branch-2.11 updated: [fix][broker] Fix potential can't remove producer/reader reference (#17559)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 862bf0b8c2e [fix][broker] Fix potential can't remove producer/reader reference (#17559)
862bf0b8c2e is described below
commit 862bf0b8c2e9449407ba16aaa6ef42f07f4a3627
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Sep 13 11:04:07 2022 +0800
[fix][broker] Fix potential can't remove producer/reader reference (#17559)
---
.../broker/systopic/SystemTopicClientBase.java | 8 ++--
.../systopic/TopicPoliciesSystemTopicClient.java | 48 +++++++++++++++-------
2 files changed, 37 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java
index 65dfa0e19af..cb45077341d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClientBase.java
@@ -56,9 +56,9 @@ public abstract class SystemTopicClientBase<T> implements SystemTopicClient<T> {
@Override
public CompletableFuture<Reader<T>> newReaderAsync() {
- return newReaderAsyncInternal().thenCompose(reader -> {
+ return newReaderAsyncInternal().thenApply(reader -> {
readers.add(reader);
- return CompletableFuture.completedFuture(reader);
+ return reader;
});
}
@@ -73,9 +73,9 @@ public abstract class SystemTopicClientBase<T> implements SystemTopicClient<T> {
@Override
public CompletableFuture<Writer<T>> newWriterAsync() {
- return newWriterAsyncInternal().thenCompose(writer -> {
+ return newWriterAsyncInternal().thenApply(writer -> {
writers.add(writer);
- return CompletableFuture.completedFuture(writer);
+ return writer;
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index 84b8bd636bd..4ad2137d80b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.systopic;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -48,12 +50,12 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
return client.newProducer(Schema.AVRO(PulsarEvent.class))
.topic(topicName.toString())
.enableBatching(false)
- .createAsync().thenCompose(producer -> {
+ .createAsync()
+ .thenApply(producer -> {
if (log.isDebugEnabled()) {
log.debug("[{}] A new writer is created", topicName);
}
- return CompletableFuture.completedFuture(new TopicPolicyWriter(producer,
- TopicPoliciesSystemTopicClient.this));
+ return new TopicPolicyWriter(producer, TopicPoliciesSystemTopicClient.this);
});
}
@@ -62,13 +64,13 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
return client.newReader(Schema.AVRO(PulsarEvent.class))
.topic(topicName.toString())
.startMessageId(MessageId.earliest)
- .readCompacted(true).createAsync()
- .thenCompose(reader -> {
+ .readCompacted(true)
+ .createAsync()
+ .thenApply(reader -> {
if (log.isDebugEnabled()) {
log.debug("[{}] A new reader is created", topicName);
}
- return CompletableFuture.completedFuture(new TopicPolicyReader(reader,
- TopicPoliciesSystemTopicClient.this));
+ return new TopicPolicyReader(reader, TopicPoliciesSystemTopicClient.this);
});
}
@@ -121,15 +123,23 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
@Override
public void close() throws IOException {
- this.producer.close();
- systemTopicClient.getWriters().remove(TopicPolicyWriter.this);
+ try {
+ closeAsync().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new PulsarServerException(e.getCause());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
@Override
public CompletableFuture<Void> closeAsync() {
- return producer.closeAsync().thenCompose(v -> {
+ return producer.closeAsync().whenComplete((r, ex) -> {
systemTopicClient.getWriters().remove(TopicPolicyWriter.this);
- return CompletableFuture.completedFuture(null);
});
}
@@ -184,15 +194,23 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
@Override
public void close() throws IOException {
- this.reader.close();
- systemTopic.getReaders().remove(TopicPolicyReader.this);
+ try {
+ closeAsync().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new PulsarServerException(e.getCause());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
@Override
public CompletableFuture<Void> closeAsync() {
- return reader.closeAsync().thenCompose(v -> {
+ return reader.closeAsync().whenComplete((r, ex) -> {
systemTopic.getReaders().remove(TopicPolicyReader.this);
- return CompletableFuture.completedFuture(null);
});
}