You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/18 12:30:57 UTC

[GitHub] [pulsar] leizhiyuan opened a new pull request, #16655: fix:ReconsumeLater will hang up if retryLetterProducer create fail or…

leizhiyuan opened a new pull request, #16655:
URL: https://github.com/apache/pulsar/pull/16655

   … exception
   
   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   *(If this PR fixes a github issue, please add `Fixes #<xyz>`.)*
   
   Fixes #16653
   
   *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*
   
   Master Issue: #<xyz>
   
   ### Motivation
   
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on a diff in pull request #16655: fix:ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#discussion_r924087864


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java:
##########
@@ -457,4 +462,58 @@ public void testRetryTopicByCustomTopicName() throws Exception {
         checkConsumer.close();
     }
 
+
+    @Test
+    public void testRetryTopicException() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 1;
+        // subscribe before publish
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .build())
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+
+        // mock a retry producer exception when reconsumelater is called
+        MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer;
+        List<ConsumerImpl> consumers = multiTopicsConsumer.getConsumers();
+        for (ConsumerImpl c : consumers) {
+            Set<Field> deadLetterPolicyField =
+                    ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
+
+            if (deadLetterPolicyField.size() != 0) {
+                Field field = deadLetterPolicyField.iterator().next();
+                field.setAccessible(true);
+                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
+                deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
+            }
+        }

Review Comment:
   ```suggestion
           MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
           List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
           for (ConsumerImpl<byte[]> c : consumers) {
               c.getDeadLetterPolicy().setRetryLetterTopic("#persistent://invlaid-topic#");
           }
   ```
   
   Instead of depending on reflection, I'd prefer to add a `@VisibleForTesting` method `getDeadLetterPolicy` in `ConsumerImpl`. I don't know whether it's the conventions in Pulsar, though.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java:
##########
@@ -457,4 +462,58 @@ public void testRetryTopicByCustomTopicName() throws Exception {
         checkConsumer.close();
     }
 
+
+    @Test

Review Comment:
   ```suggestion
       @Test(timeOut = 30000L)
   ```
   
   Since the original failure is hanging, we should add a timeout field here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on a diff in pull request #16655: [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#discussion_r925105287


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java:
##########
@@ -457,4 +462,58 @@ public void testRetryTopicByCustomTopicName() throws Exception {
         checkConsumer.close();
     }
 
+
+    @Test
+    public void testRetryTopicException() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 1;
+        // subscribe before publish
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .build())
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+
+        // mock a retry producer exception when reconsumelater is called
+        MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer;
+        List<ConsumerImpl> consumers = multiTopicsConsumer.getConsumers();
+        for (ConsumerImpl c : consumers) {
+            Set<Field> deadLetterPolicyField =
+                    ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
+
+            if (deadLetterPolicyField.size() != 0) {
+                Field field = deadLetterPolicyField.iterator().next();
+                field.setAccessible(true);
+                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
+                deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
+            }
+        }

Review Comment:
   @Technoboy- fair enough. @leizhiyuan You may update the generic type parameter parts though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16655: fix:ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#discussion_r925096586


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java:
##########
@@ -457,4 +462,58 @@ public void testRetryTopicByCustomTopicName() throws Exception {
         checkConsumer.close();
     }
 
+
+    @Test
+    public void testRetryTopicException() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 1;
+        // subscribe before publish
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .build())
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+
+        // mock a retry producer exception when reconsumelater is called
+        MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer;
+        List<ConsumerImpl> consumers = multiTopicsConsumer.getConsumers();
+        for (ConsumerImpl c : consumers) {
+            Set<Field> deadLetterPolicyField =
+                    ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
+
+            if (deadLetterPolicyField.size() != 0) {
+                Field field = deadLetterPolicyField.iterator().next();
+                field.setAccessible(true);
+                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
+                deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
+            }
+        }

Review Comment:
   No need to add @VisibleForTesting for this test.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16655: fix:ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#discussion_r923343099


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -729,6 +730,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
                 Set<MessageId> messageIds = Collections.singleton(messageId);
                 unAckedMessageTracker.remove(messageId);
                 redeliverUnacknowledgedMessages(messageIds);
+                result.complete(null);

Review Comment:
   result.completeExceptionally(x);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on a diff in pull request #16655: [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#discussion_r925106149


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java:
##########
@@ -457,4 +462,58 @@ public void testRetryTopicByCustomTopicName() throws Exception {
         checkConsumer.close();
     }
 
+
+    @Test(timeOut = 30000L)
+    public void testRetryTopicException() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 1;
+        // subscribe before publish
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .build())
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+
+        // mock a retry producer exception when reconsumelater is called
+        MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer;
+        List<ConsumerImpl> consumers = multiTopicsConsumer.getConsumers();
+        for (ConsumerImpl c : consumers) {
+            Set<Field> deadLetterPolicyField =
+                    ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
+
+            if (deadLetterPolicyField.size() != 0) {
+                Field field = deadLetterPolicyField.iterator().next();
+                field.setAccessible(true);
+                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
+                deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
+            }
+        }

Review Comment:
   ```suggestion
           MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
           List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
           for (ConsumerImpl<byte[]> c : consumers) {
               Set<Field> deadLetterPolicyField =
                       ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
   
               if (deadLetterPolicyField.size() != 0) {
                   Field field = deadLetterPolicyField.iterator().next();
                   field.setAccessible(true);
                   DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
                   deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
               }
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] leizhiyuan commented on pull request #16655: fix:ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
leizhiyuan commented on PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#issuecomment-1188578578

   > @leizhiyuan thanks for preparing this patch!
   > 
   > Could you add a test case to reproduce the future hanging?
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on pull request #16655: fix:ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#issuecomment-1187334720

   @leizhiyuan thanks for preparing this patch!
   
   Could you add a test case to reproduce the future hanging?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on a diff in pull request #16655: fix:ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#discussion_r924475734


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java:
##########
@@ -457,4 +462,58 @@ public void testRetryTopicByCustomTopicName() throws Exception {
         checkConsumer.close();
     }
 
+
+    @Test
+    public void testRetryTopicException() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 1;
+        // subscribe before publish
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .build())
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+
+        // mock a retry producer exception when reconsumelater is called
+        MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer;
+        List<ConsumerImpl> consumers = multiTopicsConsumer.getConsumers();
+        for (ConsumerImpl c : consumers) {
+            Set<Field> deadLetterPolicyField =
+                    ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
+
+            if (deadLetterPolicyField.size() != 0) {
+                Field field = deadLetterPolicyField.iterator().next();
+                field.setAccessible(true);
+                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
+                deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
+            }
+        }

Review Comment:
   Reflection is somehow brittle. I tend to avoid reflection unless we really need it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on pull request #16655: [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#issuecomment-1190881418

   cc @codelipenghui @eolivelli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on pull request #16655: [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#issuecomment-1195159433

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #16655: [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16655:
URL: https://github.com/apache/pulsar/pull/16655


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] leizhiyuan commented on a diff in pull request #16655: fix:ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
leizhiyuan commented on code in PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#discussion_r924474336


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java:
##########
@@ -457,4 +462,58 @@ public void testRetryTopicByCustomTopicName() throws Exception {
         checkConsumer.close();
     }
 
+
+    @Test
+    public void testRetryTopicException() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 1;
+        // subscribe before publish
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .build())
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+
+        // mock a retry producer exception when reconsumelater is called
+        MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer;
+        List<ConsumerImpl> consumers = multiTopicsConsumer.getConsumers();
+        for (ConsumerImpl c : consumers) {
+            Set<Field> deadLetterPolicyField =
+                    ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
+
+            if (deadLetterPolicyField.size() != 0) {
+                Field field = deadLetterPolicyField.iterator().next();
+                field.setAccessible(true);
+                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
+                deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
+            }
+        }

Review Comment:
   here I only want to construct an exception, @VisibleForTesting also ok ,but it seems  not necessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] leizhiyuan commented on a diff in pull request #16655: fix:ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
leizhiyuan commented on code in PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#discussion_r924475575


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java:
##########
@@ -457,4 +462,58 @@ public void testRetryTopicByCustomTopicName() throws Exception {
         checkConsumer.close();
     }
 
+
+    @Test

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16655: fix:ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#issuecomment-1187290469

   @leizhiyuan Please provide a correct documentation label for your PR.
   Instructions see [Pulsar Documentation Label Guide](https://docs.google.com/document/d/1Qw7LHQdXWBW9t2-r-A7QdFDBwmZh6ytB4guwMoXHqc0).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on pull request #16655: [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #16655:
URL: https://github.com/apache/pulsar/pull/16655#issuecomment-1196335007

   @codelipenghui @hangc0276 I think we should be able to include this fix in 2.11.0 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org