You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/09 15:07:58 UTC

[pulsar] branch master updated: Closing FunctionAssignmentTailer should handle ConsumerAlreadyClosedException (#3747)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 73595c9  Closing FunctionAssignmentTailer should handle ConsumerAlreadyClosedException (#3747)
73595c9 is described below

commit 73595c944328b16760afd2182503862d0655f58c
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Tue Apr 9 23:07:53 2019 +0800

    Closing FunctionAssignmentTailer should handle ConsumerAlreadyClosedException (#3747)
    
    *Motivation*
    
    Currently when FunctionAssignmentTailer throws RuntimeException when received
    exception when tailing the assignment topic. Exception can be thrown when closing
    the reader.
    
    The runtime exception is the root cause why PulsarWorkerAssignmentTest timeouts
    during shutting down.
    
    *Modifications*
    
    Handle AlreadyClosedException differently
---
 .../org/apache/pulsar/common/util/FutureUtil.java  |  9 ++++++
 .../functions/worker/FunctionAssignmentTailer.java | 37 ++++++++++++++++------
 2 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 0aa6a96..e83167a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 public class FutureUtil {
 
@@ -38,4 +39,12 @@ public class FutureUtil {
         future.completeExceptionally(t);
         return future;
     }
+
+    public static Throwable unwrapCompletionException(Throwable t) {
+        if (t instanceof CompletionException) {
+            return unwrapCompletionException(t.getCause());
+        } else {
+            return t;
+        }
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index b40c6cd..bac3128 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -22,8 +22,9 @@ import java.io.IOException;
 import java.util.function.Function;
 
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 
 import lombok.extern.slf4j.Slf4j;
@@ -32,13 +33,12 @@ import lombok.extern.slf4j.Slf4j;
 public class FunctionAssignmentTailer
     implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
 
-        private final FunctionRuntimeManager functionRuntimeManager;
-        private final Reader<byte[]> reader;
+    private final FunctionRuntimeManager functionRuntimeManager;
+    private final Reader<byte[]> reader;
+    private boolean closed = false;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader)
-            throws PulsarClientException {
+    public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) {
         this.functionRuntimeManager = functionRuntimeManager;
-
         this.reader = reader;
     }
 
@@ -54,8 +54,12 @@ public class FunctionAssignmentTailer
 
     @Override
     public void close() {
+        if (closed) {
+            return;
+        }
         log.info("Stopping function state consumer");
         try {
+            closed = true;
             reader.close();
         } catch (IOException e) {
             log.error("Failed to stop function state consumer", e);
@@ -91,8 +95,23 @@ public class FunctionAssignmentTailer
 
     @Override
     public Void apply(Throwable cause) {
-        log.error("Failed to retrieve messages from assignment update topic", cause);
-        // TODO: find a better way to handle consumer functions
-        throw new RuntimeException(cause);
+        Throwable realCause = FutureUtil.unwrapCompletionException(cause);
+        if (realCause instanceof AlreadyClosedException) {
+            // if reader is closed because tailer is closed, ignore the exception
+            if (closed) {
+                // ignore
+                return null;
+            } else {
+                log.error("Reader of assignment update topic is closed unexpectedly", cause);
+                throw new RuntimeException(
+                    "Reader of assignment update topic is closed unexpectedly",
+                    cause
+                );
+            }
+        } else {
+            log.error("Failed to retrieve messages from assignment update topic", cause);
+            // TODO: find a better way to handle consumer functions
+            throw new RuntimeException(cause);
+        }
     }
 }