You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/13 01:55:28 UTC

[GitHub] [pulsar] horizonzy opened a new pull request, #15144: [enhence] [client-consumer] Always handle notifyPendingReceivedCallback in internalPinnedExecutor.

horizonzy opened a new pull request, #15144:
URL: https://github.com/apache/pulsar/pull/15144

   ### Motivation
   Handle notifyPendingReceivedCallback in internalPinnedExecutor.
   
   ### Documentation
   - [ ] `no-need-doc` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1113895233

   /pulsarbot run-failure-checks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1141608192

   /pulsarbot run-failure-checks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1150571126

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1179460356

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15144: [enhence] [client-consumer] Always handle notifyPendingReceivedCallback in internalPinnedExecutor.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1097474886

   @horizonzy:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15144: [enhence] [client-consumer] Always handle notifyPendingReceivedCallback in internalPinnedExecutor.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1097476240

   @horizonzy:Thanks for providing doc info!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15144: [enhence] [client-consumer] Always handle notifyPendingReceivedCallback in internalPinnedExecutor.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1097471001

   @BewareMyPower @michaeljmarshall cc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1141628850

   @RobertIndie could you help to check it again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#discussion_r849292346


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java:
##########
@@ -305,17 +306,23 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
             producer.send(message.getBytes());
         }
 
-        try {
-            consumer.receiveAsync().handle((ok, e) -> {
-                if (e == null) {
-                    // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
-                    Assert.fail();
-                }
-                return null;
-            });
-        } finally {
-            consumer.close();
-        }
+        AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+        consumer.receiveAsync().handle((ok, e) -> {
+            // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
+            exceptionRef.set(e);
+            return null;
+        });
+        Awaitility.await().untilAsserted(() -> assertNotNull(exceptionRef.get()));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getClass(),
+                PulsarClientException.InvalidMessageException.class));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getMessage(),
+                String.format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ",
+                        consumer.getSubscription(), consumer.getConsumerName())));
+        // zeroConsumerImpl will close when received batch message.
+        Assert.assertFalse(consumer.isConnected());

Review Comment:
   Maybe we should use await untilAsserted here. Because calling the callback and closing the consumer is not synchronous



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java:
##########
@@ -305,17 +306,23 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
             producer.send(message.getBytes());
         }
 
-        try {
-            consumer.receiveAsync().handle((ok, e) -> {
-                if (e == null) {
-                    // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
-                    Assert.fail();
-                }
-                return null;
-            });
-        } finally {
-            consumer.close();
-        }
+        AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+        consumer.receiveAsync().handle((ok, e) -> {
+            // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
+            exceptionRef.set(e);
+            return null;
+        });
+        Awaitility.await().untilAsserted(() -> assertNotNull(exceptionRef.get()));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getClass(),
+                PulsarClientException.InvalidMessageException.class));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getMessage(),

Review Comment:
   I think we don't need to await unitlAsserted here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#discussion_r850000388


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java:
##########
@@ -182,14 +182,20 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
         log.warn(
                 "Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size",
                 subscription, consumerName);
-        // close connection
-        closeAsync().handle((ok, e) -> {

Review Comment:
   This changes not about race conditions. It more like unify code. notifyPendingReceivedCallback should in  internalPinnedExecutor.
   And fix the bad behavior: close consumer firstly, makes pendings receives get inaccurate error info.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #15144: [fix] [client] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly then close consumer

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1501570391

   Since we will start the RC version of `3.0.0` on `2023-04-11`, I will change the label/milestone of PR who have not been merged.
   - The PR of type `feature` is deferred to `3.1.0`
   - The PR of type `fix` is deferred to `3.0.1`
   
   So drag this PR to `3.0.1`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#discussion_r849320496


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java:
##########
@@ -305,17 +306,23 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
             producer.send(message.getBytes());
         }
 
-        try {
-            consumer.receiveAsync().handle((ok, e) -> {
-                if (e == null) {
-                    // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
-                    Assert.fail();
-                }
-                return null;
-            });
-        } finally {
-            consumer.close();
-        }
+        AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+        consumer.receiveAsync().handle((ok, e) -> {
+            // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
+            exceptionRef.set(e);
+            return null;
+        });
+        Awaitility.await().untilAsserted(() -> assertNotNull(exceptionRef.get()));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getClass(),
+                PulsarClientException.InvalidMessageException.class));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getMessage(),

Review Comment:
   That should be it. message consume is async.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#discussion_r849362196


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java:
##########
@@ -305,17 +306,23 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
             producer.send(message.getBytes());
         }
 
-        try {
-            consumer.receiveAsync().handle((ok, e) -> {
-                if (e == null) {
-                    // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
-                    Assert.fail();
-                }
-                return null;
-            });
-        } finally {
-            consumer.close();
-        }
+        AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+        consumer.receiveAsync().handle((ok, e) -> {
+            // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
+            exceptionRef.set(e);
+            return null;
+        });
+        Awaitility.await().untilAsserted(() -> assertNotNull(exceptionRef.get()));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getClass(),
+                PulsarClientException.InvalidMessageException.class));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getMessage(),

Review Comment:
   Sorry, I didn't express my point clearly. The lines 317 and 319 seem not to need to await unitlAsserted. Once we get the `exceptionRef` on line 316, the subsequent assert should not have to wait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#discussion_r849652900


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java:
##########
@@ -182,14 +182,20 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
         log.warn(
                 "Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size",
                 subscription, consumerName);
-        // close connection
-        closeAsync().handle((ok, e) -> {

Review Comment:
   The `closeAsync()` method already fails all messages in the `pendingReceives` queue. As such, the callback on the `closeAsync()` future hasn't ever had any affect. I think it is a good catch to re-order these tasks. However, I wonder if there are any chances for race conditions because we're delaying putting the consumer into a `Closing` state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1141597604

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#discussion_r849320709


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java:
##########
@@ -305,17 +306,23 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
             producer.send(message.getBytes());
         }
 
-        try {
-            consumer.receiveAsync().handle((ok, e) -> {
-                if (e == null) {
-                    // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
-                    Assert.fail();
-                }
-                return null;
-            });
-        } finally {
-            consumer.close();
-        }
+        AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+        consumer.receiveAsync().handle((ok, e) -> {
+            // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
+            exceptionRef.set(e);
+            return null;
+        });
+        Awaitility.await().untilAsserted(() -> assertNotNull(exceptionRef.get()));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getClass(),
+                PulsarClientException.InvalidMessageException.class));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getMessage(),
+                String.format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ",
+                        consumer.getSubscription(), consumer.getConsumerName())));
+        // zeroConsumerImpl will close when received batch message.
+        Assert.assertFalse(consumer.isConnected());

Review Comment:
   fine.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#discussion_r849362196


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java:
##########
@@ -305,17 +306,23 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
             producer.send(message.getBytes());
         }
 
-        try {
-            consumer.receiveAsync().handle((ok, e) -> {
-                if (e == null) {
-                    // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
-                    Assert.fail();
-                }
-                return null;
-            });
-        } finally {
-            consumer.close();
-        }
+        AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+        consumer.receiveAsync().handle((ok, e) -> {
+            // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
+            exceptionRef.set(e);
+            return null;
+        });
+        Awaitility.await().untilAsserted(() -> assertNotNull(exceptionRef.get()));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getClass(),
+                PulsarClientException.InvalidMessageException.class));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getMessage(),

Review Comment:
   Sorry, I didn't express my point clearly. The lines 317 and 319 seem not need to await unitlAsserted. Once we get the `exceptionRef` on line 316, the subsequent assert should not have to wait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #15144: [enhence] [client-consumer] Always handle notifyPendingReceivedCallback in internalPinnedExecutor.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#issuecomment-1097502964

   Currently, zero queue consumer cannot receive batched messages. It is intended, otherwise it should be a consumer whose queue size is 1.
   
   The `ZeroQueueConsumerImpl#receiveIndividualMessagesFromBatch` implementation is just for this case. See `ZeroQueueSizeTest#testFailedZeroQueueSizeBatchMessage`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #15144: [fix] [client-consumer] NotifyPendingReceivedCallback when zeroConsumerImpl received batch message firstly , then close consumer.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #15144:
URL: https://github.com/apache/pulsar/pull/15144#discussion_r849371058


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java:
##########
@@ -305,17 +306,23 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
             producer.send(message.getBytes());
         }
 
-        try {
-            consumer.receiveAsync().handle((ok, e) -> {
-                if (e == null) {
-                    // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
-                    Assert.fail();
-                }
-                return null;
-            });
-        } finally {
-            consumer.close();
-        }
+        AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+        consumer.receiveAsync().handle((ok, e) -> {
+            // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
+            exceptionRef.set(e);
+            return null;
+        });
+        Awaitility.await().untilAsserted(() -> assertNotNull(exceptionRef.get()));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getClass(),
+                PulsarClientException.InvalidMessageException.class));
+        Awaitility.await().untilAsserted(() -> assertEquals(exceptionRef.get().getMessage(),

Review Comment:
   Got.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org