You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/21 19:43:09 UTC
[pulsar] branch master updated: Removed null check in
MessageImpl.getMessageId() to match documented behavior. (#10025)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 424eaec Removed null check in MessageImpl.getMessageId() to match documented behavior. (#10025)
424eaec is described below
commit 424eaecb85b3235709d65b08915c9ca6fd86bd8b
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Apr 21 12:42:19 2021 -0700
Removed null check in MessageImpl.getMessageId() to match documented behavior. (#10025)
---
.../apache/pulsar/client/impl/ConsumerBase.java | 138 ++++++++++++++-------
.../apache/pulsar/client/impl/ConsumerImpl.java | 5 +
.../org/apache/pulsar/client/impl/MessageImpl.java | 1 -
.../client/impl/MultiTopicsConsumerImpl.java | 4 +
4 files changed, 101 insertions(+), 47 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index dc1c858..f444100 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -273,20 +274,36 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
abstract protected CompletableFuture<Messages<T>> internalBatchReceiveAsync();
+ private static void validateMessageId(Message<?> message) throws PulsarClientException {
+ if (message == null) {
+ throw new PulsarClientException.InvalidMessageException("Non-null message is required");
+ }
+ if (message.getMessageId() == null) {
+ throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
+ }
+ }
+
+ private static void validateMessageId(MessageId messageId) throws PulsarClientException {
+ if (messageId == null) {
+ throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
+ }
+ }
+
@Override
public void acknowledge(Message<?> message) throws PulsarClientException {
- try {
- acknowledge(message.getMessageId());
- } catch (NullPointerException npe) {
- throw new PulsarClientException.InvalidMessageException(npe.getMessage());
- }
+ validateMessageId(message);
+ acknowledge(message.getMessageId());
}
@Override
public void acknowledge(MessageId messageId) throws PulsarClientException {
+ validateMessageId(messageId);
try {
acknowledgeAsync(messageId).get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@@ -295,7 +312,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void acknowledge(List<MessageId> messageIdList) throws PulsarClientException {
try {
acknowledgeAsync(messageIdList).get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@@ -304,7 +324,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void acknowledge(Messages<?> messages) throws PulsarClientException {
try {
acknowledgeAsync(messages).get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@@ -316,13 +339,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
try {
reconsumeLaterAsync(message, delayTime, unit).get();
- } catch (Exception e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
+ throw PulsarClientException.unwrap(e);
}
}
@@ -330,25 +351,29 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException {
try {
reconsumeLaterAsync(messages, delayTime, unit).get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
- try {
- acknowledgeCumulative(message.getMessageId());
- } catch (NullPointerException npe) {
- throw new PulsarClientException.InvalidMessageException(npe.getMessage());
- }
+ validateMessageId(message);
+ acknowledgeCumulative(message.getMessageId());
}
@Override
public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
+ validateMessageId(messageId);
try {
acknowledgeCumulativeAsync(messageId).get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@@ -357,7 +382,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
try {
reconsumeLaterCumulativeAsync(message, delayTime, unit).get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@@ -365,21 +393,25 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
@Override
public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
try {
- return acknowledgeAsync(message.getMessageId());
- } catch (NullPointerException npe) {
- return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+ validateMessageId(message);
+ } catch (PulsarClientException e) {
+ return FutureUtil.failedFuture(e);
}
+ return acknowledgeAsync(message.getMessageId());
}
@Override
public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
- try {
- List<MessageId> messageIds = new ArrayList<>();
- messages.forEach(message -> messageIds.add(message.getMessageId()));
- return acknowledgeAsync(messageIds);
- } catch (NullPointerException npe) {
- return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+ List<MessageId> messageIds = new ArrayList<>(messages.size());
+ for (Message<?> message: messages) {
+ try {
+ validateMessageId(message);
+ } catch (PulsarClientException e) {
+ return FutureUtil.failedFuture(e);
+ }
+ messageIds.add(message.getMessageId());
}
+ return acknowledgeAsync(messageIds);
}
@Override
@@ -393,29 +425,34 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
}
try {
- return doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit);
- } catch (NullPointerException npe) {
- return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+ validateMessageId(message);
+ } catch (PulsarClientException e) {
+ return FutureUtil.failedFuture(e);
}
+ return doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit);
}
@Override
public CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit) {
- try {
- messages.forEach(message -> reconsumeLaterAsync(message,delayTime, unit));
- return CompletableFuture.completedFuture(null);
- } catch (NullPointerException npe) {
- return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+ for (Message<?> message: messages) {
+ try {
+ validateMessageId(message);
+ } catch (PulsarClientException e) {
+ return FutureUtil.failedFuture(e);
+ }
}
+ messages.forEach(message -> reconsumeLaterAsync(message,delayTime, unit));
+ return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
try {
- return acknowledgeCumulativeAsync(message.getMessageId());
- } catch (NullPointerException npe) {
- return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+ validateMessageId(message);
+ } catch (PulsarClientException e) {
+ return FutureUtil.failedFuture(e);
}
+ return acknowledgeCumulativeAsync(message.getMessageId());
}
@Override
@@ -531,7 +568,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void unsubscribe() throws PulsarClientException {
try {
unsubscribeAsync().get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@@ -543,7 +583,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public void close() throws PulsarClientException {
try {
closeAsync().get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@@ -556,7 +599,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
public MessageId getLastMessageId() throws PulsarClientException {
try {
return getLastMessageIdAsync().get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b5e587f..8919aa0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -537,6 +537,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
long delayTime,
TimeUnit unit) {
MessageId messageId = message.getMessageId();
+ if (messageId == null) {
+ return FutureUtil.failedFuture(new PulsarClientException
+ .InvalidMessageException("Cannot handle message with null messageId"));
+ }
+
if(messageId instanceof TopicMessageIdImpl) {
messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 8ed18fe..a6b6dff 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -500,7 +500,6 @@ public class MessageImpl<T> implements Message<T> {
@Override
public MessageId getMessageId() {
- checkNotNull(messageId, "Cannot get the message id of a message that was not received");
return messageId;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8e0ae7b..2272339 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -473,6 +473,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
long delayTime,
TimeUnit unit) {
MessageId messageId = message.getMessageId();
+ if (messageId == null) {
+ return FutureUtil.failedFuture(new PulsarClientException
+ .InvalidMessageException("Cannot handle message with null messageId"));
+ }
checkArgument(messageId instanceof TopicMessageIdImpl);
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
if (getState() != State.Ready) {