You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/21 19:43:09 UTC

[pulsar] branch master updated: Removed null check in MessageImpl.getMessageId() to match documented behavior. (#10025)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 424eaec  Removed null check in MessageImpl.getMessageId() to match documented behavior. (#10025)
424eaec is described below

commit 424eaecb85b3235709d65b08915c9ca6fd86bd8b
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Apr 21 12:42:19 2021 -0700

    Removed null check in MessageImpl.getMessageId() to match documented behavior. (#10025)
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 138 ++++++++++++++-------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   5 +
 .../org/apache/pulsar/client/impl/MessageImpl.java |   1 -
 .../client/impl/MultiTopicsConsumerImpl.java       |   4 +
 4 files changed, 101 insertions(+), 47 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index dc1c858..f444100 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -273,20 +274,36 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
 
     abstract protected CompletableFuture<Messages<T>> internalBatchReceiveAsync();
 
+    private static void validateMessageId(Message<?> message) throws PulsarClientException {
+        if (message == null) {
+            throw new PulsarClientException.InvalidMessageException("Non-null message is required");
+        }
+        if (message.getMessageId() == null) {
+            throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
+        }
+    }
+
+    private static void validateMessageId(MessageId messageId) throws PulsarClientException {
+        if (messageId == null) {
+            throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
+        }
+    }
+
     @Override
     public void acknowledge(Message<?> message) throws PulsarClientException {
-        try {
-            acknowledge(message.getMessageId());
-        } catch (NullPointerException npe) {
-            throw new PulsarClientException.InvalidMessageException(npe.getMessage());
-        }
+        validateMessageId(message);
+        acknowledge(message.getMessageId());
     }
 
     @Override
     public void acknowledge(MessageId messageId) throws PulsarClientException {
+        validateMessageId(messageId);
         try {
             acknowledgeAsync(messageId).get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
             throw PulsarClientException.unwrap(e);
         }
     }
@@ -295,7 +312,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void acknowledge(List<MessageId> messageIdList) throws PulsarClientException {
         try {
             acknowledgeAsync(messageIdList).get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
             throw PulsarClientException.unwrap(e);
         }
     }
@@ -304,7 +324,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void acknowledge(Messages<?> messages) throws PulsarClientException {
         try {
             acknowledgeAsync(messages).get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
             throw PulsarClientException.unwrap(e);
         }
     }
@@ -316,13 +339,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
         try {
             reconsumeLaterAsync(message, delayTime, unit).get();
-        } catch (Exception e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
+            throw PulsarClientException.unwrap(e);
         }
     }
 
@@ -330,25 +351,29 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException {
         try {
             reconsumeLaterAsync(messages, delayTime, unit).get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
             throw PulsarClientException.unwrap(e);
         }
     }
 
     @Override
     public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
-        try {
-            acknowledgeCumulative(message.getMessageId());
-        } catch (NullPointerException npe) {
-            throw new PulsarClientException.InvalidMessageException(npe.getMessage());
-        }
+        validateMessageId(message);
+        acknowledgeCumulative(message.getMessageId());
     }
 
     @Override
     public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
+        validateMessageId(messageId);
         try {
             acknowledgeCumulativeAsync(messageId).get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
             throw PulsarClientException.unwrap(e);
         }
     }
@@ -357,7 +382,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
         try {
             reconsumeLaterCumulativeAsync(message, delayTime, unit).get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
             throw PulsarClientException.unwrap(e);
         }
     }
@@ -365,21 +393,25 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     @Override
     public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
         try {
-            return acknowledgeAsync(message.getMessageId());
-        } catch (NullPointerException npe) {
-            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+            validateMessageId(message);
+        } catch (PulsarClientException e) {
+            return FutureUtil.failedFuture(e);
         }
+        return acknowledgeAsync(message.getMessageId());
     }
 
     @Override
     public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
-        try {
-            List<MessageId> messageIds = new ArrayList<>();
-            messages.forEach(message -> messageIds.add(message.getMessageId()));
-            return acknowledgeAsync(messageIds);
-        } catch (NullPointerException npe) {
-            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+        List<MessageId> messageIds = new ArrayList<>(messages.size());
+        for (Message<?> message: messages) {
+            try {
+                validateMessageId(message);
+            } catch (PulsarClientException e) {
+                return FutureUtil.failedFuture(e);
+            }
+            messageIds.add(message.getMessageId());
         }
+        return acknowledgeAsync(messageIds);
     }
 
     @Override
@@ -393,29 +425,34 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
         }
         try {
-            return doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit);
-        } catch (NullPointerException npe) {
-            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+            validateMessageId(message);
+        } catch (PulsarClientException e) {
+            return FutureUtil.failedFuture(e);
         }
+        return doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit);
     }
 
     @Override
     public CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit) {
-        try {
-            messages.forEach(message -> reconsumeLaterAsync(message,delayTime, unit));
-            return CompletableFuture.completedFuture(null);
-        } catch (NullPointerException npe) {
-            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+        for (Message<?> message: messages) {
+            try {
+                validateMessageId(message);
+            } catch (PulsarClientException e) {
+                return FutureUtil.failedFuture(e);
+            }
         }
+        messages.forEach(message -> reconsumeLaterAsync(message,delayTime, unit));
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
         try {
-            return acknowledgeCumulativeAsync(message.getMessageId());
-        } catch (NullPointerException npe) {
-            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
+            validateMessageId(message);
+        } catch (PulsarClientException e) {
+            return FutureUtil.failedFuture(e);
         }
+        return acknowledgeCumulativeAsync(message.getMessageId());
     }
 
     @Override
@@ -531,7 +568,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void unsubscribe() throws PulsarClientException {
         try {
             unsubscribeAsync().get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
             throw PulsarClientException.unwrap(e);
         }
     }
@@ -543,7 +583,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public void close() throws PulsarClientException {
         try {
             closeAsync().get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
             throw PulsarClientException.unwrap(e);
         }
     }
@@ -556,7 +599,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     public MessageId getLastMessageId() throws PulsarClientException {
         try {
             return getLastMessageIdAsync().get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
             throw PulsarClientException.unwrap(e);
         }
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b5e587f..8919aa0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -537,6 +537,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                                                        long delayTime,
                                                        TimeUnit unit) {
         MessageId messageId = message.getMessageId();
+        if (messageId == null) {
+            return FutureUtil.failedFuture(new PulsarClientException
+                    .InvalidMessageException("Cannot handle message with null messageId"));
+        }
+
         if(messageId instanceof TopicMessageIdImpl) {
             messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 8ed18fe..a6b6dff 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -500,7 +500,6 @@ public class MessageImpl<T> implements Message<T> {
 
     @Override
     public MessageId getMessageId() {
-        checkNotNull(messageId, "Cannot get the message id of a message that was not received");
         return messageId;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8e0ae7b..2272339 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -473,6 +473,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                                                        long delayTime,
                                                        TimeUnit unit) {
         MessageId messageId = message.getMessageId();
+        if (messageId == null) {
+            return FutureUtil.failedFuture(new PulsarClientException
+                    .InvalidMessageException("Cannot handle message with null messageId"));
+        }
         checkArgument(messageId instanceof TopicMessageIdImpl);
         TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
         if (getState() != State.Ready) {