You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by 丛搏 <bo...@apache.org> on 2023/03/11 07:16:32 UTC

Re: [DISCUSS] Retry topic should not create for a retry topic

+1, I agree with you.

We should prohibit users from creating Retry Topic and DLQ Topic in a
loop. It will make uncontrollable behavior.
If we allow that, It may cause great trouble to users.

Thanks,
Bo


Asaf Mesika <as...@gmail.com> 于2023年2月14日周二 03:02写道:

>
> Sounds like a bug for sure.
> How did you plan on solving it?
>
>
> On Mon, Feb 13, 2023 at 12:46 AM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > Il Dom 12 Feb 2023, 04:42 Yubiao Feng <yubiao.feng@streamnative.io
> > .invalid>
> > ha scritto:
> >
> > > Hi Enrico Olivelli
> > >
> > > > It is good to help users to not fall into bad situations but on the
> > other
> > > case we cannot deal with many silly configurations that you could set up,
> > > like creating a pipeline of functions that in the end create a cycle.
> > >
> > > Sorry, this test just helps to reproduce the problem quickly. The reality
> > > is that there is only one consumer, but every restart triggers this issue
> > > and ends up with a topic like this:
> > > "persistent://public/default/tp1-sub1-RETRY-sub1-RETRY-sub1-RETRY...."
> > >
> > > > I wonder if we could simply document this fact instead of adding code
> > >
> > > ```java
> > > Consumer<> consumer = pulsarClient.newConsumer()
> > > .topicsPattern("my-property/my-ns/.*").subscriptionName("sub1")
> > > .enableRetry(true)
> > > ```
> > >
> > > With the client restarted, the code above will reproduce the problem.
> > >
> >
> >
> > I see the problem now.
> >
> > We must do something for this case. It must not happen. We have to fix it
> >
> > Thanks for your clarification
> >
> > Enrico
> >
> > >
> > > On Sun, Feb 12, 2023 at 3:31 AM Enrico Olivelli <eo...@gmail.com>
> > > wrote:
> > >
> > > > Yubiao,
> > > >
> > > > Il Sab 11 Feb 2023, 19:06 Yubiao Feng <yubiao.feng@streamnative.io
> > > > .invalid>
> > > > ha scritto:
> > > >
> > > > > Hi community
> > > > >
> > > > > I am starting a DISCUSS for "Retry topic should not create for a
> > retry
> > > > > topic."
> > > > >
> > > > > If we use regex-topic consumer and enable retry, it is possible to
> > > create
> > > > > such a topic
> > > > >
> > "persistent://public/default/tp1-sub1-RETRY-sub2-RETRY-sub3-RETRY....".
> > > > You
> > > > > can reproduce this by using the test below.
> > > > >
> > > > > It probably doesn't make sense to create a RETRY/DLQ topic on
> > > RETRY/DLQ.
> > > > We
> > > > > should avoid this scenario if users use the default configuration
> > > (users
> > > > > can enable it if they need it).
> > > > >
> > > >
> > > > I agree that this is a bad case.
> > > > But should we really care?
> > > >
> > > > You must do it very intentionally.
> > > > It is good to help users to not fall into bad situations but on the
> > other
> > > > case we cannot deal with many silly configurations that you could set
> > up,
> > > > like creating a pipeline of functions that in the end create a cycle.
> > > >
> > > >
> > > > I wonder if we could simply document this fact instead of adding code
> > > >
> > > >
> > > > Enrico
> > > >
> > > >
> > > >
> > > >
> > > > > ```java
> > > > >     @Test
> > > > >     public void testRetryTopicWillNotCreatedForRetryTopic() throws
> > > > > Exception {
> > > > >         final String topic = "persistent://my-property/my-ns/tp1";
> > > > >         Producer<byte[]> producer =
> > > > pulsarClient.newProducer(Schema.BYTES)
> > > > >                 .topic(topic)
> > > > >                 .create();
> > > > >         for (int i = 0; i < 100; i++) {
> > > > >             producer.send(String.format("Hello Pulsar [%d]",
> > > > > i).getBytes());
> > > > >         }
> > > > >         producer.close();
> > > > >
> > > > >         for (int i =0; i< 10; i++) {
> > > > >             Consumer<byte[]> consumer =
> > > > > pulsarClient.newConsumer(Schema.BYTES)
> > > > >                     .topicsPattern("my-property/my-ns/.*")
> > > > >                     .subscriptionName("sub" + i)
> > > > >                     .enableRetry(true)
> > > > >
> > > > >
> > > >
> > >
> > .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build())
> > > > >
> > > > > .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
> > > > >                     .subscribe();
> > > > >             Message<byte[]> message = consumer.receive();
> > > > >             log.info("consumer received message : {} {}",
> > > > > message.getMessageId(), new String(message.getData()));
> > > > >             consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
> > > > >             consumer.close();
> > > > >         }
> > > > >
> > > > >         Set<String> tps =
> > > > >
> > > > >
> > > >
> > >
> > pulsar.getBrokerService().getTopics().keys().stream().collect(Collectors.toSet());
> > > > >         try {
> > > > >             for (String tp : tps) {
> > > > >                 assertTrue(howManyKeyWordRetryInTopicName(tp,
> > > > > RETRY_GROUP_TOPIC_SUFFIX) <= 1, tp);
> > > > >                 assertTrue(howManyKeyWordRetryInTopicName(tp,
> > > > > DLQ_GROUP_TOPIC_SUFFIX) <= 1, tp);
> > > > >             }
> > > > >         } finally {
> > > > >             // cleanup.
> > > > >             for (String tp : tps){
> > > > >                 if (tp.startsWith(topic)) {
> > > > >                     admin.topics().delete(tp ,true);
> > > > >                 }
> > > > >             }
> > > > >         }
> > > > >     }
> > > > >
> > > > >     private int howManyKeyWordRetryInTopicName(String topicName,
> > String
> > > > > keyWord) {
> > > > >         int retryCountInTopicName = 0;
> > > > >         String tpCp = topicName;
> > > > >         while (true) {
> > > > >             int index = tpCp.indexOf(keyWord);
> > > > >             if (index < 0) {
> > > > >                 break;
> > > > >             }
> > > > >             tpCp = tpCp.substring(index + keyWord.length());
> > > > >             retryCountInTopicName++;
> > > > >         }
> > > > >         return  retryCountInTopicName;
> > > > >     }
> > > > > ```
> > > > >
> > > > > Thanks
> > > > > Yubiao Feng
> > > > >
> > > >
> > >
> >