You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/20 04:30:22 UTC

[GitHub] [pulsar] heesung-sn opened a new pull request, #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

heesung-sn opened a new pull request, #17728:
URL: https://github.com/apache/pulsar/pull/17728

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   -->
   
   <!-- Either this PR fixes an issue, -->
   
   Fixes #<xyz>
   
   <!-- or this PR is one task of an issue -->
   
   Master Issue: #<xyz>
   
   ### Motivation
   
   <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->
   
   
   https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java#L190
   
   ```
       @Override
       public CompletableFuture<Message<T>> readNextAsync() {
           CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
           receiveFuture.whenComplete((msg, t) -> {
              if (msg != null) {
                  consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
                      log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
                              getConsumer().getSubscription(), msg.getMessageId(), ex);
                      return null;
                  });
              }
           });
           return receiveFuture;
       }
   ```
   
    I see ack failures due to null message id -- because we release the msg in TableViewImpl before reader.acknowledgeCumulativeAsync.
   
   https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java#L176
   
   
   
   ```
   2022-09-16T13:59:31,448 - WARN  - [pulsar-client-internal-39-1:ReaderImpl@184] - [persistent://public/default/test][reader-9c73a60e29] acknowledge message null cumulative fail.
   org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Cannot handle message with null messageId
   	at org.apache.pulsar.client.impl.ConsumerBase.validateMessageId(ConsumerBase.java:358) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:554) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
   	at org.apache.pulsar.client.impl.ReaderImpl.lambda$readNextAsync$3(ReaderImpl.java:183) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
   	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
   	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
   	at org.apache.pulsar.client.impl.ConsumerBase.lambda$completePendingReceive$0(ConsumerBase.java:288) ~[pulsar-client-original-2.11.0-SNAPSHOT.jar:?]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at java.lang.Thread.run(Thread.java:833) ~[?:?]
   ```
   
   
   The root cause is that we return `receiveFuture` here instead of `receiveFuture.whenComplete`. 
   
   ### Modifications
   
   Return the CompetedFuture from `.whenComplete(.. acknowledgeCumulativeAsync )`. 
   <!-- Describe the modifications you've done. -->
   
   ### Verifying this change
   
   - [x ] Make sure that the change passes the CI checks.
   
   
   This change added tests and can be verified as follows:
   
     - *Added a unit test to confirm `acknowledgeCumulativeAsync()` that gets called after the messageId null check.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/heesung-sn/pulsar/pull/9
   
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r976646524


##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java:
##########
@@ -64,13 +66,20 @@ public void clean() {
     }
 
     @Test
-    void shouldSupportCancellingReadNextAsync() {
+    void shouldSupportCancellingReadNextAsync() throws IllegalAccessException {
         // given
-        CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+        reader.readNextAsync();
         Awaitility.await().untilAsserted(() -> {
             assertTrue(reader.getConsumer().hasNextPendingReceive());
         });
 
+        ConsumerBase consumer = (ConsumerBase)
+                FieldUtils.readDeclaredField(reader, "consumer", true);
+        ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>
+                pendingReceives = (ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>)
+                FieldUtils.readField(consumer, "pendingReceives", true);
+        CompletableFuture<Message<byte[]>> future = pendingReceives.peek();
+

Review Comment:
   Yes, this seems to be the better approach if we want to keep supporting the `ReaderImpl.readNextAsync().cancel()` behavior.
   
   
   I see that the return future of `MultiTopicsReaderImpl.readNextAsync()` is not cancellable now. Do we want to make the same change , `CompletableFutureCancellationHandler handler` in `MultiTopicsReaderImpl.readNextAsync() ` too?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#issuecomment-1257380256

   Hi @heesung-sn 
   
   I'm not sure if we introduce two things.
   1. Fix the null messageId cause ack failure.
   2. Introduce cancel ability. (is it a feature here?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r976646524


##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java:
##########
@@ -64,13 +66,20 @@ public void clean() {
     }
 
     @Test
-    void shouldSupportCancellingReadNextAsync() {
+    void shouldSupportCancellingReadNextAsync() throws IllegalAccessException {
         // given
-        CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+        reader.readNextAsync();
         Awaitility.await().untilAsserted(() -> {
             assertTrue(reader.getConsumer().hasNextPendingReceive());
         });
 
+        ConsumerBase consumer = (ConsumerBase)
+                FieldUtils.readDeclaredField(reader, "consumer", true);
+        ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>
+                pendingReceives = (ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>)
+                FieldUtils.readField(consumer, "pendingReceives", true);
+        CompletableFuture<Message<byte[]>> future = pendingReceives.peek();
+

Review Comment:
   Yes, this seems to be the better approach if we want to keep supporting the `ReaderImpl.readNextAsync().cancel()` behavior.
   
   
   I see that the original future, `multiTopicsConsumer.receiveAsync()` in `MultiTopicsReaderImpl.readNextAsync()` is not cancellable now. Do we want to make the same change , `CompletableFutureCancellationHandler handler` in `MultiTopicsReaderImpl.readNextAsync() ` too(is this another bug)?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r975765216


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java:
##########
@@ -177,17 +177,15 @@ public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientExcept
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
-        receiveFuture.whenComplete((msg, t) -> {
-           if (msg != null) {
-               consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
-                   log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
-                           getConsumer().getSubscription(), msg.getMessageId(), ex);
-                   return null;
-               });
-           }
+        return consumer.receiveAsync().thenApply(msg -> {

Review Comment:
   Here, the modification is to return the CompetedFuture downstream from `thenApply(msg -> {acknowledgeCumulativeAsync ) ` instead of `receiveFuture` like `MultiTopicsReade.readNextAsync()`
   
   The messageId can be null after `message.release()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r976646524


##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java:
##########
@@ -64,13 +66,20 @@ public void clean() {
     }
 
     @Test
-    void shouldSupportCancellingReadNextAsync() {
+    void shouldSupportCancellingReadNextAsync() throws IllegalAccessException {
         // given
-        CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+        reader.readNextAsync();
         Awaitility.await().untilAsserted(() -> {
             assertTrue(reader.getConsumer().hasNextPendingReceive());
         });
 
+        ConsumerBase consumer = (ConsumerBase)
+                FieldUtils.readDeclaredField(reader, "consumer", true);
+        ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>
+                pendingReceives = (ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>)
+                FieldUtils.readField(consumer, "pendingReceives", true);
+        CompletableFuture<Message<byte[]>> future = pendingReceives.peek();
+

Review Comment:
   Yes, this seems to be the better approach if we want to keep supporting the `ReaderImpl.readNextAsync().cancel()` behavior.
   
   
   I see that the original future, `multiTopicsConsumer.receiveAsync()` in `MultiTopicsReaderImpl.readNextAsync()` is not cancellable now. Do we want to make the same change , `CompletableFutureCancellationHandler handler` in `MultiTopicsReaderImpl.readNextAsync() ` too?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#issuecomment-1256437345

   Raised a cherry-pick PR here.
   
   https://github.com/apache/pulsar/pull/17828


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r975251099


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java:
##########
@@ -177,17 +177,16 @@ public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientExcept
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
-        receiveFuture.whenComplete((msg, t) -> {
-           if (msg != null) {
-               consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
-                   log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
-                           getConsumer().getSubscription(), msg.getMessageId(), ex);
-                   return null;
-               });
-           }
-        });
-        return receiveFuture;
+        return consumer.receiveAsync()
+                .whenComplete((msg, t) -> {

Review Comment:
   If we don't care about `t` here, maybe we can change it to `thenApply()`?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -217,4 +224,57 @@ public void testPublishNullValue() throws Exception {
         assertEquals(tv1.size(), 1);
         assertEquals(tv.get("key2"), "value2");
     }
+
+    @DataProvider(name = "partitionedTopic")
+    public static Object[][] partitioned() {
+        return new Object[][] {{true}, {false}};

Review Comment:
   When I try the test on my laptop.
   Looks like the test for the partitioned topic will always get passed without the fix.
   The non-partitioned topic test works fine, I see the warning logs. After applying the fix, the test can get passed.
   
   ```
   2022-09-20T20:05:46,640 - WARN  - [pulsar-client-internal-39-1:ReaderImpl@184] - [persistent://public/default/tableview-no-partition-ack-test][reader-57f0eede9d] acknowledge message null cumulative fail.
   org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Cannot handle message with null messageId
   	at org.apache.pulsar.client.impl.ConsumerBase.validateMessageId(ConsumerBase.java:358) ~[classes/:?]
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:554) ~[classes/:?]
   	at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732) ~[?:?]
   	at org.mockito.internal.util.reflection.InstrumentationMemberAccessor$Dispatcher$ByteBuddy$ktU1uSvS.invokeWithArguments(Unknown Source) ~[?:?]
   	at org.mockito.internal.util.reflection.InstrumentationMemberAccessor.invoke(InstrumentationMemberAccessor.java:239) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.util.reflection.ModuleMemberAccessor.invoke(ModuleMemberAccessor.java:55) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.tryInvoke(MockMethodAdvice.java:333) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.access$500(MockMethodAdvice.java:60) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice$RealMethodCall.invoke(MockMethodAdvice.java:253) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:142) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:45) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.Answers.answer(Answers.java:99) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:110) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151) ~[mockito-core-3.12.4.jar:?]
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:554) ~[classes/:?]
   	at org.apache.pulsar.client.impl.ReaderImpl.lambda$readNextAsync$3(ReaderImpl.java:183) ~[classes/:?]
   	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
   	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
   	at org.apache.pulsar.client.impl.ConsumerImpl.lambda$internalReceiveAsync$4(ConsumerImpl.java:487) ~[classes/:?]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at java.lang.Thread.run(Thread.java:833) ~[?:?]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r975567401


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -217,4 +224,57 @@ public void testPublishNullValue() throws Exception {
         assertEquals(tv1.size(), 1);
         assertEquals(tv.get("key2"), "value2");
     }
+
+    @DataProvider(name = "partitionedTopic")
+    public static Object[][] partitioned() {
+        return new Object[][] {{true}, {false}};

Review Comment:
   Yes, partitioned topics should pass without this fix.
   
   But we should see the test assert failure from the non-partitioned topic, without this fix.
   
   Please let me know if you see different behavior.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
Jason918 commented on PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#issuecomment-1255772187

   @heesung-sn Can you help open another PR to cherry-pick this to branch-2.10 ? there are some conflicts with direct cherry-picking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#issuecomment-1258330346

   > I'm not sure if we introduce two things.
   > 
   > Fix the null messageId cause ack failure.
   > Introduce cancel ability. (is it a feature here?)
   
   
   yes, this PR introduce
   > 1. Fix the null messageId cause ack failure.
   
   Regarding 
   
   > 2. Introduce cancel ability. (is it a feature here?)
   
   This has been already supported in the ReaderImpl so no addition for ReaderImpl. However, the `.cancel()` didn't work in the `MultiTopicsReader`, which is a bug. So, we fixed it in this PR.
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui closed pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
codelipenghui closed pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId
URL: https://github.com/apache/pulsar/pull/17728


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r976795390


##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java:
##########
@@ -64,13 +66,20 @@ public void clean() {
     }
 
     @Test
-    void shouldSupportCancellingReadNextAsync() {
+    void shouldSupportCancellingReadNextAsync() throws IllegalAccessException {
         // given
-        CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+        reader.readNextAsync();
         Awaitility.await().untilAsserted(() -> {
             assertTrue(reader.getConsumer().hasNextPendingReceive());
         });
 
+        ConsumerBase consumer = (ConsumerBase)
+                FieldUtils.readDeclaredField(reader, "consumer", true);
+        ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>
+                pendingReceives = (ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>)
+                FieldUtils.readField(consumer, "pendingReceives", true);
+        CompletableFuture<Message<byte[]>> future = pendingReceives.peek();
+

Review Comment:
   I assume the current behavior of `multiTopicsConsumer.receiveAsync().cancel()` is invalid. I also fixed this part and added a test to cover `multiTopicsConsumer.receiveAsync().cancel()` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r976046076


##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java:
##########
@@ -64,13 +66,20 @@ public void clean() {
     }
 
     @Test
-    void shouldSupportCancellingReadNextAsync() {
+    void shouldSupportCancellingReadNextAsync() throws IllegalAccessException {
         // given
-        CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+        reader.readNextAsync();
         Awaitility.await().untilAsserted(() -> {
             assertTrue(reader.getConsumer().hasNextPendingReceive());
         });
 
+        ConsumerBase consumer = (ConsumerBase)
+                FieldUtils.readDeclaredField(reader, "consumer", true);
+        ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>
+                pendingReceives = (ConcurrentLinkedQueue<CompletableFuture<Message<byte[]>>>)
+                FieldUtils.readField(consumer, "pendingReceives", true);
+        CompletableFuture<Message<byte[]>> future = pendingReceives.peek();
+

Review Comment:
   This will break the `reader.readNextAsync().cancel()` behavior because users will not do as this test does. I think the reason is the future instance in the pending receives is not the same one that was returned to the user.
   
   I have tried this one, it should work for this case.
   
   ```java
   public CompletableFuture<Message<T>> readNextAsync() {
           CompletableFuture<Message<T>> originalFuture = consumer.receiveAsync();
           CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
               consumer.acknowledgeCumulativeAsync(msg)
                       .exceptionally(ex -> {
                           log.error("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
                                   getConsumer().getSubscription(), msg.getMessageId(), ex);
                           return null;
                       });
               return msg;
           });
           CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
           handler.attachToFuture(result);
           handler.setCancelAction(() -> originalFuture.cancel(false));
           return result;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lin-zhao commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
lin-zhao commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r975697127


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java:
##########
@@ -177,17 +177,15 @@ public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientExcept
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
-        receiveFuture.whenComplete((msg, t) -> {
-           if (msg != null) {
-               consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
-                   log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
-                           getConsumer().getSubscription(), msg.getMessageId(), ex);
-                   return null;
-               });
-           }
+        return consumer.receiveAsync().thenApply(msg -> {

Review Comment:
   How does this fix work? Your error message says `msg` is not null but `msg.getMessageId` is null.
   
   Unless I'm reading this wrong, your change is a cleaner way but the effect remains the same. Baring very unexpected behaviors (for example, `consumer.acknowledgeCumulativeAsync(msg)` throwing an exception, which before the change is silently disgarded.).
   
   If `msg.messageId()` somehow is null, you would have the same error, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r976035552


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -217,4 +224,57 @@ public void testPublishNullValue() throws Exception {
         assertEquals(tv1.size(), 1);
         assertEquals(tv.get("key2"), "value2");
     }
+
+    @DataProvider(name = "partitionedTopic")
+    public static Object[][] partitioned() {
+        return new Object[][] {{true}, {false}};

Review Comment:
   @heesung-sn Yes, the same behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17728:
URL: https://github.com/apache/pulsar/pull/17728


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r979515227


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java:
##########
@@ -177,17 +178,20 @@ public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientExcept
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
-        receiveFuture.whenComplete((msg, t) -> {
-           if (msg != null) {
-               consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
-                   log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
-                           getConsumer().getSubscription(), msg.getMessageId(), ex);
-                   return null;
-               });
-           }
+        CompletableFuture<Message<T>> originalFuture = consumer.receiveAsync();
+        CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
+            consumer.acknowledgeCumulativeAsync(msg)

Review Comment:
   > see ack failures due to null message id -- because we release the msg in TableViewImpl before reader.acknowledgeCumulativeAsync.
   
   This fix will still exist problem (logically) because `consumer.acknowledgeCumulativeAsync(msg)` is an async operation. It may run parallel with the user operation.
   Please let me know what you think or if I'm missing something. thanks a lot!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org