You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/20 12:09:36 UTC

[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17728: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId

codelipenghui commented on code in PR #17728:
URL: https://github.com/apache/pulsar/pull/17728#discussion_r975251099


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java:
##########
@@ -177,17 +177,16 @@ public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientExcept
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
-        receiveFuture.whenComplete((msg, t) -> {
-           if (msg != null) {
-               consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
-                   log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
-                           getConsumer().getSubscription(), msg.getMessageId(), ex);
-                   return null;
-               });
-           }
-        });
-        return receiveFuture;
+        return consumer.receiveAsync()
+                .whenComplete((msg, t) -> {

Review Comment:
   If we don't care about `t` here, maybe we can change it to `thenApply()`?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -217,4 +224,57 @@ public void testPublishNullValue() throws Exception {
         assertEquals(tv1.size(), 1);
         assertEquals(tv.get("key2"), "value2");
     }
+
+    @DataProvider(name = "partitionedTopic")
+    public static Object[][] partitioned() {
+        return new Object[][] {{true}, {false}};

Review Comment:
   When I try the test on my laptop.
   Looks like the test for the partitioned topic will always get passed without the fix.
   The non-partitioned topic test works fine, I see the warning logs. After applying the fix, the test can get passed.
   
   ```
   2022-09-20T20:05:46,640 - WARN  - [pulsar-client-internal-39-1:ReaderImpl@184] - [persistent://public/default/tableview-no-partition-ack-test][reader-57f0eede9d] acknowledge message null cumulative fail.
   org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Cannot handle message with null messageId
   	at org.apache.pulsar.client.impl.ConsumerBase.validateMessageId(ConsumerBase.java:358) ~[classes/:?]
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:554) ~[classes/:?]
   	at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732) ~[?:?]
   	at org.mockito.internal.util.reflection.InstrumentationMemberAccessor$Dispatcher$ByteBuddy$ktU1uSvS.invokeWithArguments(Unknown Source) ~[?:?]
   	at org.mockito.internal.util.reflection.InstrumentationMemberAccessor.invoke(InstrumentationMemberAccessor.java:239) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.util.reflection.ModuleMemberAccessor.invoke(ModuleMemberAccessor.java:55) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.tryInvoke(MockMethodAdvice.java:333) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.access$500(MockMethodAdvice.java:60) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice$RealMethodCall.invoke(MockMethodAdvice.java:253) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:142) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:45) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.Answers.answer(Answers.java:99) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:110) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82) ~[mockito-core-3.12.4.jar:?]
   	at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:151) ~[mockito-core-3.12.4.jar:?]
   	at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeCumulativeAsync(ConsumerBase.java:554) ~[classes/:?]
   	at org.apache.pulsar.client.impl.ReaderImpl.lambda$readNextAsync$3(ReaderImpl.java:183) ~[classes/:?]
   	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
   	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
   	at org.apache.pulsar.client.impl.ConsumerImpl.lambda$internalReceiveAsync$4(ConsumerImpl.java:487) ~[classes/:?]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at java.lang.Thread.run(Thread.java:833) ~[?:?]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org