You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/06 10:18:57 UTC

[pulsar] branch branch-2.11 updated: [improve][broker] Using `handle` instead of `handleAsync` to avoid using common pool thread (#17403)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 620d9ce5d48 [improve][broker] Using `handle` instead of `handleAsync` to avoid using common pool thread (#17403)
620d9ce5d48 is described below

commit 620d9ce5d4807f4fba934fb75e980b01bfcd0dae
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Sep 4 00:24:40 2022 +0800

    [improve][broker] Using `handle` instead of `handleAsync` to avoid using common pool thread (#17403)
    
    * Using `handle` instead of `handleAsync` to avoid using common pool thread.
    
    * fix deadlock.
    
    (cherry picked from commit 596d6995ccbd4e51fd86dd34c6019f88a6d8109e)
---
 .../service/schema/BookkeeperSchemaStorage.java    | 24 ++++++----------------
 1 file changed, 6 insertions(+), 18 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 9094f94eb80..d538a89ff07 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -241,9 +241,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Fetching schema from store", schemaId);
             }
-            CompletableFuture<StoredSchema> future = new CompletableFuture<>();
-
-            getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
+            return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Got schema locator {}", schemaId, locator);
                 }
@@ -256,22 +254,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                 return readSchemaEntry(schemaLocator.getInfo().getPosition())
                         .thenApply(entry -> new StoredSchema(entry.getSchemaData().toByteArray(),
                                 new LongSchemaVersion(schemaLocator.getInfo().getVersion())));
-            }).handleAsync((res, ex) -> {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Get operation completed. res={} -- ex={}", schemaId, res, ex);
-                }
-
-                // Cleanup the pending ops from the map
-                readSchemaOperations.remove(schemaId, future);
-                if (ex != null) {
-                    future.completeExceptionally(ex);
-                } else {
-                    future.complete(res);
-                }
-                return null;
             });
-
-            return future;
+        }).whenComplete((res, ex) -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Get operation completed. res={} -- ex={}", schemaId, res, ex);
+            }
+            readSchemaOperations.remove(schemaId);
         });
     }