You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2019/06/18 08:55:49 UTC
[pulsar] branch master updated: Do not strip ExecutionException
from the stack trace (#4493)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 571b684 Do not strip ExecutionException from the stack trace (#4493)
571b684 is described below
commit 571b68464e25e8e71d3d278f58ba3a3df0c67a5b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jun 18 01:55:43 2019 -0700
Do not strip ExecutionException from the stack trace (#4493)
* Do not strip ExecutionException from the stack trace
---
.../pulsar/client/api/PulsarClientException.java | 85 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 48 ++----------
.../pulsar/client/impl/ConsumerBuilderImpl.java | 13 +---
.../apache/pulsar/client/impl/ConsumerImpl.java | 19 ++---
.../client/impl/MultiTopicsConsumerImpl.java | 22 ++----
.../apache/pulsar/client/impl/ProducerBase.java | 37 ++--------
.../pulsar/client/impl/ProducerBuilderImpl.java | 13 +---
.../pulsar/client/impl/PulsarClientImpl.java | 15 +---
.../pulsar/client/impl/ReaderBuilderImpl.java | 13 +---
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 5 +-
.../client/impl/auth/AuthenticationBasic.java | 2 +-
.../pulsar/functions/instance/ContextImpl.java | 12 +--
12 files changed, 130 insertions(+), 154 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 7aa72e1..bb681ad 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
/**
* Base type of exception thrown by Pulsar client
@@ -62,12 +63,20 @@ public class PulsarClientException extends IOException {
}
public static class TimeoutException extends PulsarClientException {
+ public TimeoutException(Throwable t) {
+ super(t);
+ }
+
public TimeoutException(String msg) {
super(msg);
}
}
public static class IncompatibleSchemaException extends PulsarClientException {
+ public IncompatibleSchemaException(Throwable t) {
+ super(t);
+ }
+
public IncompatibleSchemaException(String msg) {
super(msg);
}
@@ -86,6 +95,10 @@ public class PulsarClientException extends IOException {
}
public static class ConnectException extends PulsarClientException {
+ public ConnectException(Throwable t) {
+ super(t);
+ }
+
public ConnectException(String msg) {
super(msg);
}
@@ -212,4 +225,76 @@ public class PulsarClientException extends IOException {
super(msg);
}
}
+
+ public static PulsarClientException unwrap(Throwable t) {
+ if (t instanceof PulsarClientException) {
+ return (PulsarClientException) t;
+ } else if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else if (!(t instanceof ExecutionException)) {
+ // Generic exception
+ return new PulsarClientException(t);
+ } else if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ return new PulsarClientException(t);
+ }
+
+ // Unwrap the exception to keep the same exception type but a stack trace that includes the application calling
+ // site
+ Throwable cause = t.getCause();
+ String msg = cause.getMessage();
+ if (cause instanceof TimeoutException) {
+ return new TimeoutException(msg);
+ } else if (cause instanceof InvalidConfigurationException) {
+ return new InvalidConfigurationException(msg);
+ } else if (cause instanceof AuthenticationException) {
+ return new AuthenticationException(msg);
+ } else if (cause instanceof IncompatibleSchemaException) {
+ return new IncompatibleSchemaException(msg);
+ } else if (cause instanceof TooManyRequestsException) {
+ return new TooManyRequestsException(msg);
+ } else if (cause instanceof LookupException) {
+ return new LookupException(msg);
+ } else if (cause instanceof ConnectException) {
+ return new ConnectException(msg);
+ } else if (cause instanceof AlreadyClosedException) {
+ return new AlreadyClosedException(msg);
+ } else if (cause instanceof TopicTerminatedException) {
+ return new TopicTerminatedException(msg);
+ } else if (cause instanceof AuthorizationException) {
+ return new AuthorizationException(msg);
+ } else if (cause instanceof GettingAuthenticationDataException) {
+ return new GettingAuthenticationDataException(msg);
+ } else if (cause instanceof UnsupportedAuthenticationException) {
+ return new UnsupportedAuthenticationException(msg);
+ } else if (cause instanceof BrokerPersistenceException) {
+ return new BrokerPersistenceException(msg);
+ } else if (cause instanceof BrokerMetadataException) {
+ return new BrokerMetadataException(msg);
+ } else if (cause instanceof ProducerBusyException) {
+ return new ProducerBusyException(msg);
+ } else if (cause instanceof ConsumerBusyException) {
+ return new ConsumerBusyException(msg);
+ } else if (cause instanceof NotConnectedException) {
+ return new NotConnectedException();
+ } else if (cause instanceof InvalidMessageException) {
+ return new InvalidMessageException(msg);
+ } else if (cause instanceof InvalidTopicNameException) {
+ return new InvalidTopicNameException(msg);
+ } else if (cause instanceof NotSupportedException) {
+ return new NotSupportedException(msg);
+ } else if (cause instanceof ProducerQueueIsFullError) {
+ return new ProducerQueueIsFullError(msg);
+ } else if (cause instanceof ProducerBlockedQuotaExceededError) {
+ return new ProducerBlockedQuotaExceededError(msg);
+ } else if (cause instanceof ProducerBlockedQuotaExceededException) {
+ return new ProducerBlockedQuotaExceededException(msg);
+ } else if (cause instanceof ChecksumException) {
+ return new ChecksumException(msg);
+ } else if (cause instanceof CryptoException) {
+ return new CryptoException(msg);
+ } else {
+ return new PulsarClientException(t);
+ }
+ }
}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f8f6569..fe9cf66 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -180,16 +180,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void acknowledge(MessageId messageId) throws PulsarClientException {
try {
acknowledgeAsync(messageId).get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -206,16 +198,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
try {
acknowledgeCumulativeAsync(messageId).get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -264,16 +248,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void unsubscribe() throws PulsarClientException {
try {
unsubscribeAsync().get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -284,16 +260,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void close() throws PulsarClientException {
try {
closeAsync().get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 0df9a89..6913442 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -87,16 +86,8 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
public Consumer<T> subscribe() throws PulsarClientException {
try {
return subscribeAsync().get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f329cd7..a25a98a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -317,9 +317,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
messageProcessed(interceptMsg);
return interceptMsg;
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
stats.incrementNumReceiveFailed();
- throw new PulsarClientException(e);
+ throw PulsarClientException.unwrap(e);
}
}
@@ -363,12 +362,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
return interceptMsg;
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumReceiveFailed();
- throw new PulsarClientException(e);
+ throw PulsarClientException.unwrap(e);
} else {
return null;
}
@@ -1369,8 +1366,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
public void seek(MessageId messageId) throws PulsarClientException {
try {
seekAsync(messageId).get();
- } catch (ExecutionException | InterruptedException e) {
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -1378,8 +1375,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
public void seek(long timestamp) throws PulsarClientException {
try {
seekAsync(timestamp).get();
- } catch (ExecutionException | InterruptedException e) {
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -1457,8 +1454,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
return hasMessageAvailableAsync().get();
- } catch (ExecutionException | InterruptedException e) {
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f2f0c35..a5e5138 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -339,9 +339,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -356,9 +355,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -570,10 +568,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
public void seek(MessageId messageId) throws PulsarClientException {
try {
seekAsync(messageId).get();
- } catch (ExecutionException e) {
- throw new PulsarClientException(e.getCause());
- } catch (InterruptedException e) {
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -581,10 +577,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
public void seek(long timestamp) throws PulsarClientException {
try {
seekAsync(timestamp).get();
- } catch (ExecutionException e) {
- throw new PulsarClientException(e.getCause());
- } catch (InterruptedException e) {
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index fa046ca..2913efd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -83,16 +82,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
}
return sendFuture.get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -100,16 +91,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
public void flush() throws PulsarClientException {
try {
flushAsync().get();
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof PulsarClientException) {
- throw (PulsarClientException) cause;
- } else {
- throw new PulsarClientException(cause);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -119,16 +102,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
public void close() throws PulsarClientException {
try {
closeAsync().get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 193d034..888b58e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
@@ -84,16 +83,8 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
public Producer<T> create() throws PulsarClientException {
try {
return createAsync().get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 0e58796..764d25c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -343,7 +343,7 @@ public class PulsarClientImpl implements PulsarClient {
listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
- consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
+ consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
}
@@ -519,15 +519,8 @@ public class PulsarClientImpl implements PulsarClient {
public void close() throws PulsarClientException {
try {
closeAsync().get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -580,7 +573,7 @@ public class PulsarClientImpl implements PulsarClient {
conf.getAuthentication().close();
} catch (Throwable t) {
log.warn("Failed to shutdown Pulsar client", t);
- throw new PulsarClientException(t);
+ throw PulsarClientException.unwrap(t);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index dc898cc..47f7106 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -66,16 +65,8 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
public Reader<T> create() throws PulsarClientException {
try {
return createAsync().get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index b0dc4b3..6beb0eb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -53,7 +53,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, startMessageId,
schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
-
+
public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
@@ -117,9 +117,8 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
stats.updateNumMsgsReceived(message);
return message;
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
stats.incrementNumReceiveFailed();
- throw new PulsarClientException(e);
+ throw PulsarClientException.unwrap(e);
} finally {
// Finally blocked is invoked in case the block on incomingMessages is interrupted
waitingOnReceiveForZeroQueueSize = false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
index 37454d2..4aecb10 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
@@ -48,7 +48,7 @@ public class AuthenticationBasic implements Authentication, EncodedAuthenticatio
try {
return new AuthenticationDataBasic(userId, password);
} catch (Exception e) {
- throw new PulsarClientException(e);
+ throw PulsarClientException.unwrap(e);
}
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index f81c582..e5de503 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -436,16 +436,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
public MessageId send() throws PulsarClientException {
try {
return sendAsync().get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
}