You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/11 21:15:47 UTC
[pulsar] branch master updated: [fix][client] Thread.currentThread.interrupt() after catching InterruptedException (#10163)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4b7ada4f770 [fix][client] Thread.currentThread.interrupt() after catching InterruptedException (#10163)
4b7ada4f770 is described below
commit 4b7ada4f77020ed4c0b158daf19f6c0675b33776
Author: yingchang <zj...@163.com>
AuthorDate: Mon Dec 12 05:15:35 2022 +0800
[fix][client] Thread.currentThread.interrupt() after catching InterruptedException (#10163)
---
.../org/apache/pulsar/client/api/Consumer.java | 2 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 ++
.../client/impl/MultiTopicsConsumerImpl.java | 4 ++
.../pulsar/common/util/ExceptionHandler.java | 48 ++++++++++++++++++++++
4 files changed, 58 insertions(+)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 3fbab236a60..9a3ef7833df 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -77,6 +77,8 @@ public interface Consumer<T> extends Closeable, MessageAcknowledger {
*
* <p>This calls blocks until a message is available.
*
+ * <p>When thread is Interrupted, return a null value and reset interrupted flag.
+ *
* @return the received message
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f4bd3a6bb6e..2909a62933e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -108,6 +108,7 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
+import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
@@ -455,6 +456,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
+ ExceptionHandler.handleInterruptedException(e);
stats.incrementNumReceiveFailed();
throw PulsarClientException.unwrap(e);
}
@@ -493,6 +495,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
+ ExceptionHandler.handleInterruptedException(e);
State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumReceiveFailed();
@@ -508,6 +511,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
try {
return internalBatchReceiveAsync().get();
} catch (InterruptedException | ExecutionException e) {
+ ExceptionHandler.handleInterruptedException(e);
State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumBatchReceiveFailed();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 48ccd474af7..32431415a6c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
+import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -367,6 +368,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (Exception e) {
+ ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
}
}
@@ -387,6 +389,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (Exception e) {
+ ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
}
}
@@ -396,6 +399,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
try {
return internalBatchReceiveAsync().get();
} catch (InterruptedException | ExecutionException e) {
+ ExceptionHandler.handleInterruptedException(e);
State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumBatchReceiveFailed();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ExceptionHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ExceptionHandler.java
new file mode 100644
index 00000000000..3f84904aa19
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ExceptionHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Util class to place some special handling of exceptions.
+ */
+public class ExceptionHandler {
+
+ /**
+ * This utility class should not be instantiated.
+ */
+ private ExceptionHandler() {
+ }
+
+ /**
+ * If the throwable is InterruptedException, reset the thread interrupted flag.
+ * We can use it in catching block when we need catch the InterruptedException
+ * and reset the thread interrupted flag no matter whether the throwable being caught is InterruptedException.
+ *
+ * @param throwable the throwable being caught
+ */
+ public static void handleInterruptedException(Throwable throwable) {
+ if (throwable instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ } else if (throwable instanceof ExecutionException) {
+ handleInterruptedException(throwable.getCause());
+ }
+ }
+}