You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2019/06/18 08:55:49 UTC

[pulsar] branch master updated: Do not strip ExecutionException from the stack trace (#4493)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 571b684  Do not strip ExecutionException from the stack trace (#4493)
571b684 is described below

commit 571b68464e25e8e71d3d278f58ba3a3df0c67a5b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jun 18 01:55:43 2019 -0700

    Do not strip ExecutionException from the stack trace (#4493)
    
    * Do not strip ExecutionException from the stack trace
---
 .../pulsar/client/api/PulsarClientException.java   | 85 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 48 ++----------
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 13 +---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 19 ++---
 .../client/impl/MultiTopicsConsumerImpl.java       | 22 ++----
 .../apache/pulsar/client/impl/ProducerBase.java    | 37 ++--------
 .../pulsar/client/impl/ProducerBuilderImpl.java    | 13 +---
 .../pulsar/client/impl/PulsarClientImpl.java       | 15 +---
 .../pulsar/client/impl/ReaderBuilderImpl.java      | 13 +---
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  5 +-
 .../client/impl/auth/AuthenticationBasic.java      |  2 +-
 .../pulsar/functions/instance/ContextImpl.java     | 12 +--
 12 files changed, 130 insertions(+), 154 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 7aa72e1..bb681ad 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Base type of exception thrown by Pulsar client
@@ -62,12 +63,20 @@ public class PulsarClientException extends IOException {
     }
 
     public static class TimeoutException extends PulsarClientException {
+        public TimeoutException(Throwable t) {
+            super(t);
+        }
+
         public TimeoutException(String msg) {
             super(msg);
         }
     }
 
     public static class IncompatibleSchemaException extends PulsarClientException {
+        public IncompatibleSchemaException(Throwable t) {
+            super(t);
+        }
+
         public IncompatibleSchemaException(String msg) {
             super(msg);
         }
@@ -86,6 +95,10 @@ public class PulsarClientException extends IOException {
     }
 
     public static class ConnectException extends PulsarClientException {
+        public ConnectException(Throwable t) {
+            super(t);
+        }
+
         public ConnectException(String msg) {
             super(msg);
         }
@@ -212,4 +225,76 @@ public class PulsarClientException extends IOException {
             super(msg);
         }
     }
+
+    public static PulsarClientException unwrap(Throwable t) {
+        if (t instanceof PulsarClientException) {
+            return (PulsarClientException) t;
+        } else if (t instanceof RuntimeException) {
+            throw (RuntimeException) t;
+        } else if (!(t instanceof ExecutionException)) {
+            // Generic exception
+            return new PulsarClientException(t);
+        } else if (t instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+            return new PulsarClientException(t);
+        }
+
+        // Unwrap the exception to keep the same exception type but a stack trace that includes the application calling
+        // site
+        Throwable cause = t.getCause();
+        String msg = cause.getMessage();
+        if (cause instanceof TimeoutException) {
+            return new TimeoutException(msg);
+        } else if (cause instanceof InvalidConfigurationException) {
+            return new InvalidConfigurationException(msg);
+        } else if (cause instanceof AuthenticationException) {
+            return new AuthenticationException(msg);
+        } else if (cause instanceof IncompatibleSchemaException) {
+            return new IncompatibleSchemaException(msg);
+        } else if (cause instanceof TooManyRequestsException) {
+            return new TooManyRequestsException(msg);
+        } else if (cause instanceof LookupException) {
+            return new LookupException(msg);
+        } else if (cause instanceof ConnectException) {
+            return new ConnectException(msg);
+        } else if (cause instanceof AlreadyClosedException) {
+            return new AlreadyClosedException(msg);
+        } else if (cause instanceof TopicTerminatedException) {
+            return new TopicTerminatedException(msg);
+        } else if (cause instanceof AuthorizationException) {
+            return new AuthorizationException(msg);
+        } else if (cause instanceof GettingAuthenticationDataException) {
+            return new GettingAuthenticationDataException(msg);
+        } else if (cause instanceof UnsupportedAuthenticationException) {
+            return new UnsupportedAuthenticationException(msg);
+        } else if (cause instanceof BrokerPersistenceException) {
+            return new BrokerPersistenceException(msg);
+        } else if (cause instanceof BrokerMetadataException) {
+            return new BrokerMetadataException(msg);
+        } else if (cause instanceof ProducerBusyException) {
+            return new ProducerBusyException(msg);
+        } else if (cause instanceof ConsumerBusyException) {
+            return new ConsumerBusyException(msg);
+        } else if (cause instanceof NotConnectedException) {
+            return new NotConnectedException();
+        } else if (cause instanceof InvalidMessageException) {
+            return new InvalidMessageException(msg);
+        } else if (cause instanceof InvalidTopicNameException) {
+            return new InvalidTopicNameException(msg);
+        } else if (cause instanceof NotSupportedException) {
+            return new NotSupportedException(msg);
+        } else if (cause instanceof ProducerQueueIsFullError) {
+            return new ProducerQueueIsFullError(msg);
+        } else if (cause instanceof ProducerBlockedQuotaExceededError) {
+            return new ProducerBlockedQuotaExceededError(msg);
+        } else if (cause instanceof ProducerBlockedQuotaExceededException) {
+            return new ProducerBlockedQuotaExceededException(msg);
+        } else if (cause instanceof ChecksumException) {
+            return new ChecksumException(msg);
+        } else if (cause instanceof CryptoException) {
+            return new CryptoException(msg);
+        } else {
+            return new PulsarClientException(t);
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f8f6569..fe9cf66 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -180,16 +180,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void acknowledge(MessageId messageId) throws PulsarClientException {
         try {
             acknowledgeAsync(messageId).get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -206,16 +198,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
         try {
             acknowledgeCumulativeAsync(messageId).get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -264,16 +248,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void unsubscribe() throws PulsarClientException {
         try {
             unsubscribeAsync().get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -284,16 +260,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void close() throws PulsarClientException {
         try {
             closeAsync().get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 0df9a89..6913442 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
@@ -87,16 +86,8 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     public Consumer<T> subscribe() throws PulsarClientException {
         try {
             return subscribeAsync().get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f329cd7..a25a98a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -317,9 +317,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             messageProcessed(interceptMsg);
             return interceptMsg;
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
             stats.incrementNumReceiveFailed();
-            throw new PulsarClientException(e);
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -363,12 +362,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             }
             return interceptMsg;
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
             State state = getState();
             if (state != State.Closing && state != State.Closed) {
                 stats.incrementNumReceiveFailed();
-                throw new PulsarClientException(e);
+                throw PulsarClientException.unwrap(e);
             } else {
                 return null;
             }
@@ -1369,8 +1366,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     public void seek(MessageId messageId) throws PulsarClientException {
         try {
             seekAsync(messageId).get();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -1378,8 +1375,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     public void seek(long timestamp) throws PulsarClientException {
         try {
             seekAsync(timestamp).get();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -1457,8 +1454,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             }
 
             return hasMessageAvailableAsync().get();
-        } catch (ExecutionException | InterruptedException e) {
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f2f0c35..a5e5138 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -339,9 +339,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             unAckedMessageTracker.add(message.getMessageId());
             resumeReceivingFromPausedConsumersIfNeeded();
             return message;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -356,9 +355,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             }
             resumeReceivingFromPausedConsumersIfNeeded();
             return message;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -570,10 +568,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     public void seek(MessageId messageId) throws PulsarClientException {
         try {
             seekAsync(messageId).get();
-        } catch (ExecutionException e) {
-            throw new PulsarClientException(e.getCause());
-        } catch (InterruptedException e) {
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -581,10 +577,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     public void seek(long timestamp) throws PulsarClientException {
         try {
             seekAsync(timestamp).get();
-        } catch (ExecutionException e) {
-            throw new PulsarClientException(e.getCause());
-        } catch (InterruptedException e) {
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index fa046ca..2913efd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.impl;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -83,16 +82,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
             }
 
             return sendFuture.get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -100,16 +91,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
     public void flush() throws PulsarClientException {
         try {
             flushAsync().get();
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-            if (cause instanceof PulsarClientException) {
-                throw (PulsarClientException) cause;
-            } else {
-                throw new PulsarClientException(cause);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -119,16 +102,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
     public void close() throws PulsarClientException {
         try {
             closeAsync().get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 193d034..888b58e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
@@ -84,16 +83,8 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
     public Producer<T> create() throws PulsarClientException {
         try {
             return createAsync().get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 0e58796..764d25c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -343,7 +343,7 @@ public class PulsarClientImpl implements PulsarClient {
                     listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
             } else {
                 consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
-                        consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors, 
+                        consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
                         this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
             }
 
@@ -519,15 +519,8 @@ public class PulsarClientImpl implements PulsarClient {
     public void close() throws PulsarClientException {
         try {
             closeAsync().get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -580,7 +573,7 @@ public class PulsarClientImpl implements PulsarClient {
             conf.getAuthentication().close();
         } catch (Throwable t) {
             log.warn("Failed to shutdown Pulsar client", t);
-            throw new PulsarClientException(t);
+            throw PulsarClientException.unwrap(t);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index dc898cc..47f7106 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -66,16 +65,8 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
     public Reader<T> create() throws PulsarClientException {
         try {
             return createAsync().get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index b0dc4b3..6beb0eb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -53,7 +53,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
     	this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
     		 schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
     }
-    
+
     public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
             SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
@@ -117,9 +117,8 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
             stats.updateNumMsgsReceived(message);
             return message;
         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
             stats.incrementNumReceiveFailed();
-            throw new PulsarClientException(e);
+            throw PulsarClientException.unwrap(e);
         } finally {
             // Finally blocked is invoked in case the block on incomingMessages is interrupted
             waitingOnReceiveForZeroQueueSize = false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
index 37454d2..4aecb10 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
@@ -48,7 +48,7 @@ public class AuthenticationBasic implements Authentication, EncodedAuthenticatio
         try {
             return new AuthenticationDataBasic(userId, password);
         } catch (Exception e) {
-            throw new PulsarClientException(e);
+            throw PulsarClientException.unwrap(e);
         }
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index f81c582..e5de503 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -436,16 +436,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
         public MessageId send() throws PulsarClientException {
             try {
                 return sendAsync().get();
-            } catch (ExecutionException e) {
-                Throwable t = e.getCause();
-                if (t instanceof PulsarClientException) {
-                    throw (PulsarClientException) t;
-                } else {
-                    throw new PulsarClientException(t);
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new PulsarClientException(e);
+            } catch (Exception e) {
+                throw PulsarClientException.unwrap(e);
             }
         }