You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/26 05:53:32 UTC

[pulsar] branch master updated: Fix: web-thread stuck on function metadata update (#2634)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3da0c98  Fix: web-thread stuck on function metadata update (#2634)
3da0c98 is described below

commit 3da0c9844dc5b8bf59df42e3a95bc618a1c7cf32
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Sep 25 22:53:27 2018 -0700

    Fix: web-thread stuck on function metadata update (#2634)
    
    ### Motivation
    
    If function metadata-producer gets disconnected  from broker then function-registration admin-api thread gets stuck.
    ```
    sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame; information may be imprecise)
     - java.util.concurrent.locks.LockSupport.park(java.lang.Object) @bci=14, line=175 (Interpreted frame)
     - java.util.concurrent.CompletableFuture$Signaller.block() @bci=19, line=1693 (Interpreted frame)
     - java.util.concurrent.ForkJoinPool.managedBlock(java.util.concurrent.ForkJoinPool$ManagedBlocker) @bci=119, line=3323 (Interpreted frame)
     - java.util.concurrent.CompletableFuture.waitingGet(boolean) @bci=129, line=1729 (Interpreted frame)
     - java.util.concurrent.CompletableFuture.get() @bci=11, line=1895 (Interpreted frame)
     - org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.updateRequest(org.apache.pulsar.functions.proto.Function$FunctionMetaData) @bci=18, line=562 (Interpreted frame)
     - org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(java.lang.String, java.lang.String, java.lang.String, java.io.InputStream, org.glassfish.jersey.media.multipart.FormDataContentDisposition, java.lang.String, java.lang.String, java.lang.String) @bci=461, line=190 (Interpreted frame)
     - org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource.registerFunction(java.lang.String, java.lang.String, java.lang.String, java.io.InputStream, org.glassfi
    ```
    
    
    ### Modifications
    
    Complete future on which function-admin-api-thread is waiting if fun-metadata publish fails.
    
    ### Result
    
    function-worker threads will not stuck while registering function-metadata.
---
 .../pulsar/functions/worker/FunctionMetaDataManager.java       | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 9de9226..44ff807 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -399,6 +400,15 @@ public class FunctionMetaDataManager implements AutoCloseable {
         serviceRequestInfo.setRequestResultCompletableFuture(requestResultCompletableFuture);
 
         this.pendingServiceRequests.put(serviceRequestInfo.getServiceRequest().getRequestId(), serviceRequestInfo);
+        
+        messageIdCompletableFuture.exceptionally(ex -> {
+            FunctionDetails metadata = serviceRequest.getFunctionMetaData().getFunctionDetails();
+            log.warn("Failed to submit function metadata for {}/{}/{}-{}", metadata.getTenant(),
+                    metadata.getNamespace(), metadata.getName(), ex.getMessage());
+            serviceRequestInfo.getRequestResultCompletableFuture()
+                    .completeExceptionally(new RuntimeException("Failed to submit function metadata"));
+            return null;
+        });
 
         return requestResultCompletableFuture;
     }