You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/06 10:18:57 UTC
[pulsar] branch branch-2.11 updated: [improve][broker] Using `handle` instead of `handleAsync` to avoid using common pool thread (#17403)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 620d9ce5d48 [improve][broker] Using `handle` instead of `handleAsync` to avoid using common pool thread (#17403)
620d9ce5d48 is described below
commit 620d9ce5d4807f4fba934fb75e980b01bfcd0dae
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Sep 4 00:24:40 2022 +0800
[improve][broker] Using `handle` instead of `handleAsync` to avoid using common pool thread (#17403)
* Using `handle` instead of `handleAsync` to avoid using common pool thread.
* fix deadlock.
(cherry picked from commit 596d6995ccbd4e51fd86dd34c6019f88a6d8109e)
---
.../service/schema/BookkeeperSchemaStorage.java | 24 ++++++----------------
1 file changed, 6 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 9094f94eb80..d538a89ff07 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -241,9 +241,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
if (log.isDebugEnabled()) {
log.debug("[{}] Fetching schema from store", schemaId);
}
- CompletableFuture<StoredSchema> future = new CompletableFuture<>();
-
- getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
+ return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Got schema locator {}", schemaId, locator);
}
@@ -256,22 +254,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
return readSchemaEntry(schemaLocator.getInfo().getPosition())
.thenApply(entry -> new StoredSchema(entry.getSchemaData().toByteArray(),
new LongSchemaVersion(schemaLocator.getInfo().getVersion())));
- }).handleAsync((res, ex) -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Get operation completed. res={} -- ex={}", schemaId, res, ex);
- }
-
- // Cleanup the pending ops from the map
- readSchemaOperations.remove(schemaId, future);
- if (ex != null) {
- future.completeExceptionally(ex);
- } else {
- future.complete(res);
- }
- return null;
});
-
- return future;
+ }).whenComplete((res, ex) -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Get operation completed. res={} -- ex={}", schemaId, res, ex);
+ }
+ readSchemaOperations.remove(schemaId);
});
}