You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/28 13:34:20 UTC

[GitHub] [pulsar] mattisonchao opened a new pull request #14912: [improve][client] Avoid reconnecting when ``RawReader`` is seeking.

mattisonchao opened a new pull request #14912:
URL: https://github.com/apache/pulsar/pull/14912


   Fixes #14882
   
   Master Issue: #14882
   
   ### Motivation
   
   ``compactor`` uses a durable ``RawReader`` to read messages. When compactor invoke ``doCompact`` at "TwoPhaseCompactor#phaseTwo", they will do a seek operation. code show as below:
   
   ```java
   // TwoPhaseCompactor.class  lines 198 - 207 
   private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to,
               MessageId lastReadId, Map<String, MessageId> latestForKey, BookKeeper bk, LedgerHandle ledger) {
           CompletableFuture<Long> promise = new CompletableFuture<>();
   
           reader.seekAsync(from).thenCompose((v) -> {
               Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
               CompletableFuture<Void> loopPromise = new CompletableFuture<Void>();
               phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
               return loopPromise;
           })
   ```
   And then, the broker will then disconnect all consumers and reset the cursor.
   
   Since the ``RawReader`` is a durable consumer in the broker, it reconnects when it wants to reset the cursor by the broker and store the cursor information in the metadata. It will get a ``Subscription is fenced `` exception until the cursor is reset successfully.
   
   ### Modifications
   
   - Avoid reconnecting when ``RawReader`` is seeking.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   It's hard to write tests to mock the reset cursor delay(or does anyone has a good idea? )
   
   I add a test code in source code as follow: 
   
   ```java
   // asyncResetCursor#asyncResetCursor
                   cursor.asyncResetCursor(finalPosition, forceReset, new AsyncCallbacks.ResetCursorCallback() {
                       @Override
                       public void resetComplete(Object ctx) {
                           if (log.isDebugEnabled()) {
                               log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName,
                                       finalPosition);
                           }
                           if (dispatcher != null) {
                               dispatcher.cursorIsReset();
                           }
                           // ----- test code
                           try {
                               Thread.sleep(2000);
                           } catch (InterruptedException e) {
                               e.printStackTrace();
                           }
                           // ----- test code
                           IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                           future.complete(null);
                       }
   ```
   And write a test here to verify.
   
   ```java
   ReaderTest.class
     public void testSeek() throws Exception {
           final String topic = "persistent://my-property/my-ns/seek";
           @Cleanup
           Producer<byte[]> producer = pulsarClient.newProducer()
                   .topic(topic)
                   .create();
           MessageId firstMessageId = null;
           for (int i = 0; i < 300; i++) {
               MessageId msgId = producer.send(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
               if (firstMessageId == null) {
                   firstMessageId = msgId;
               }
           }
           CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
           RawReader reader = new RawReaderImpl((PulsarClientImpl) pulsarClient, topic, "test", future);
           future.get();
           reader.seekAsync(firstMessageId).get();
           Thread.sleep(10000);
       }
   ```
   
   ### Documentation
   
   - [x] `no-need-doc` 
     
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao removed a comment on pull request #14912: [improve][client] Avoid reconnecting when ``RawReader`` is seeking.

Posted by GitBox <gi...@apache.org>.
mattisonchao removed a comment on pull request #14912:
URL: https://github.com/apache/pulsar/pull/14912#issuecomment-1081248303


   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on pull request #14912: [improve][client] Avoid reconnecting when ``RawReader`` is seeking.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on pull request #14912:
URL: https://github.com/apache/pulsar/pull/14912#issuecomment-1081248303


   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org