You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/11 21:15:47 UTC

[pulsar] branch master updated: [fix][client] Thread.currentThread.interrupt() after catching InterruptedException (#10163)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b7ada4f770 [fix][client] Thread.currentThread.interrupt() after catching InterruptedException (#10163)
4b7ada4f770 is described below

commit 4b7ada4f77020ed4c0b158daf19f6c0675b33776
Author: yingchang <zj...@163.com>
AuthorDate: Mon Dec 12 05:15:35 2022 +0800

    [fix][client] Thread.currentThread.interrupt() after catching InterruptedException (#10163)
---
 .../org/apache/pulsar/client/api/Consumer.java     |  2 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  4 ++
 .../client/impl/MultiTopicsConsumerImpl.java       |  4 ++
 .../pulsar/common/util/ExceptionHandler.java       | 48 ++++++++++++++++++++++
 4 files changed, 58 insertions(+)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 3fbab236a60..9a3ef7833df 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -77,6 +77,8 @@ public interface Consumer<T> extends Closeable, MessageAcknowledger {
      *
      * <p>This calls blocks until a message is available.
      *
+     * <p>When thread is Interrupted, return a null value and reset interrupted flag.
+     *
      * @return the received message
      * @throws PulsarClientException.AlreadyClosedException
      *             if the consumer was already closed
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f4bd3a6bb6e..2909a62933e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -108,6 +108,7 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
+import org.apache.pulsar.common.util.ExceptionHandler;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
@@ -455,6 +456,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             messageProcessed(message);
             return beforeConsume(message);
         } catch (InterruptedException e) {
+            ExceptionHandler.handleInterruptedException(e);
             stats.incrementNumReceiveFailed();
             throw PulsarClientException.unwrap(e);
         }
@@ -493,6 +495,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             messageProcessed(message);
             return beforeConsume(message);
         } catch (InterruptedException e) {
+            ExceptionHandler.handleInterruptedException(e);
             State state = getState();
             if (state != State.Closing && state != State.Closed) {
                 stats.incrementNumReceiveFailed();
@@ -508,6 +511,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         try {
             return internalBatchReceiveAsync().get();
         } catch (InterruptedException | ExecutionException e) {
+            ExceptionHandler.handleInterruptedException(e);
             State state = getState();
             if (state != State.Closing && state != State.Closed) {
                 stats.incrementNumBatchReceiveFailed();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 48ccd474af7..32431415a6c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
+import org.apache.pulsar.common.util.ExceptionHandler;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -367,6 +368,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             resumeReceivingFromPausedConsumersIfNeeded();
             return message;
         } catch (Exception e) {
+            ExceptionHandler.handleInterruptedException(e);
             throw PulsarClientException.unwrap(e);
         }
     }
@@ -387,6 +389,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             resumeReceivingFromPausedConsumersIfNeeded();
             return message;
         } catch (Exception e) {
+            ExceptionHandler.handleInterruptedException(e);
             throw PulsarClientException.unwrap(e);
         }
     }
@@ -396,6 +399,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         try {
             return internalBatchReceiveAsync().get();
         } catch (InterruptedException | ExecutionException e) {
+            ExceptionHandler.handleInterruptedException(e);
             State state = getState();
             if (state != State.Closing && state != State.Closed) {
                 stats.incrementNumBatchReceiveFailed();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ExceptionHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ExceptionHandler.java
new file mode 100644
index 00000000000..3f84904aa19
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ExceptionHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Util class to place some special handling of exceptions.
+ */
+public class ExceptionHandler {
+
+    /**
+     * This utility class should not be instantiated.
+     */
+    private ExceptionHandler() {
+    }
+
+    /**
+     * If the throwable is InterruptedException, reset the thread interrupted flag.
+     * We can use it in catching block when we need catch the InterruptedException
+     * and reset the thread interrupted flag no matter whether the throwable being caught is InterruptedException.
+     *
+     * @param throwable the throwable being caught
+     */
+    public static void handleInterruptedException(Throwable throwable) {
+        if (throwable instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+        } else if (throwable instanceof ExecutionException) {
+            handleInterruptedException(throwable.getCause());
+        }
+    }
+}