You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "JeffBolle (via GitHub)" <gi...@apache.org> on 2023/11/28 16:01:15 UTC

[PR] [feature] use async methods to read from Pulsar topic [pinot]

JeffBolle opened a new pull request, #12061:
URL: https://github.com/apache/pinot/pull/12061

   Addresses ConcurrentModificationException in reading from Pulsar topic and uses fully async calls to read from a Pulsar Reader.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1859958543

   > This PR started from addressing a concurrency issue when interacting with the List used to hold messages pulled from Pulsar after the read timeout expired. It then evolved into a refactor to use the asynchronous methods of reading from the pulsar topic to get the Future to place the read timeout on, rather than using a single threaded executor.
   
   Thanks a lot for contribution! Looks good overall, just have one comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1830274214

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12061?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Comparison is base [(`a37ced6`)](https://app.codecov.io/gh/apache/pinot/commit/a37ced6ec998aa771a71e6eba7624942b66d34b4?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.61% compared to head [(`ae1d6b4`)](https://app.codecov.io/gh/apache/pinot/pull/12061?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 46.74%.
   
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #12061       +/-   ##
   =============================================
   - Coverage     61.61%   46.74%   -14.88%     
   + Complexity     1152      943      -209     
   =============================================
     Files          2386     1788      -598     
     Lines        129579    93953    -35626     
     Branches      20056    15192     -4864     
   =============================================
   - Hits          79843    43915    -35928     
   - Misses        43923    46915     +2992     
   + Partials       5813     3123     -2690     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-14.75%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-14.74%)` | :arrow_down: |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-14.88%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-14.88%)` | :arrow_down: |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-0.20%)` | :arrow_down: |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12061?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on code in PR #12061:
URL: https://github.com/apache/pinot/pull/12061#discussion_r1429804967


##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java:
##########
@@ -75,81 +71,68 @@ public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset,
     final MessageId endMessageId =
         endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) endMsgOffset).getMessageId();
 
-    List<PulsarStreamMessage> messagesList = new ArrayList<>();
-    Future<PulsarMessageBatch> pulsarResultFuture =
-        _executorService.submit(() -> fetchMessages(startMessageId, endMessageId, messagesList));
+    final Collection<PulsarStreamMessage> messages = Collections.synchronizedList(new ArrayList<>());
+
+    CompletableFuture<PulsarMessageBatch> pulsarResultFuture = fetchMessagesAsync(startMessageId, endMessageId,
+        messages)
+        .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS)
+        .handle((v, t) -> {
+          if (!(t instanceof TimeoutException)) {
+            LOGGER.warn("Error while fetching records from Pulsar", t);
+          }
+          return new PulsarMessageBatch(buildOffsetFilteringIterable(messages, startMessageId, endMessageId),
+              _enableKeyValueStitch);
+        });
 
     try {
-      return pulsarResultFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
-    } catch (TimeoutException e) {
-      // The fetchMessages has thrown an exception. Most common cause is the timeout.
-      // We return the records fetched till now along with the next start offset.
-      pulsarResultFuture.cancel(true);
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId),
-          _enableKeyValueStitch);
+      return pulsarResultFuture.get();
     } catch (Exception e) {
       LOGGER.warn("Error while fetching records from Pulsar", e);
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId),
+      return new PulsarMessageBatch(buildOffsetFilteringIterable(messages, startMessageId, endMessageId),
           _enableKeyValueStitch);
     }
   }
 
-  public PulsarMessageBatch fetchMessages(MessageId startMessageId, MessageId endMessageId,
-      List<PulsarStreamMessage> messagesList) {
-    try {
-      _reader.seek(startMessageId);
-
-      while (_reader.hasMessageAvailable()) {
-        Message<byte[]> nextMessage = _reader.readNext();
-
-        if (endMessageId != null) {
-          if (nextMessage.getMessageId().compareTo(endMessageId) > 0) {
-            break;
-          }
-        }
-        messagesList.add(
-            PulsarUtils.buildPulsarStreamMessage(nextMessage, _enableKeyValueStitch, _pulsarMetadataExtractor));
+  public CompletableFuture<Void> fetchMessagesAsync(MessageId startMessageId, MessageId endMessageId,
+      Collection<PulsarStreamMessage> messages) {
+      CompletableFuture<Void> seekFut = _reader.seekAsync(startMessageId);
+      return seekFut.thenCompose((v) -> fetchNextMessageAndAddToCollection(endMessageId, messages));
+  }
 
-        if (Thread.interrupted()) {
-          break;
-        }
-      }
+  public CompletableFuture<Void> fetchNextMessageAndAddToCollection(MessageId endMessageId,
+      Collection<PulsarStreamMessage> messages) {
+    CompletableFuture<Boolean> hasMessagesFut = _reader.hasMessageAvailableAsync();
+    CompletableFuture<Message<byte[]>> messageFut = hasMessagesFut.thenCompose(msgAvailable ->
+        (msgAvailable) ? _reader.readNextAsync() : CompletableFuture.completedFuture(null));
+    CompletableFuture<Void> handleMessageFut = messageFut.thenCompose(messageOrNull ->
+        readMessageAndFetchNextOrComplete(endMessageId, messages, messageOrNull));
+    return handleMessageFut;
+  }
 
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId),
-          _enableKeyValueStitch);
-    } catch (PulsarClientException e) {
-      LOGGER.warn("Error consuming records from Pulsar topic", e);
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId),
-          _enableKeyValueStitch);
+  public CompletableFuture<Void> readMessageAndFetchNextOrComplete(MessageId endMessageId,
+      Collection<PulsarStreamMessage> messages, Message<byte[]> messageOrNull) {
+    if (messageOrNull == null) {
+      return CompletableFuture.completedFuture(null);
     }
+    if (endMessageId != null && messageOrNull.getMessageId().compareTo(endMessageId) > 0) {
+      return CompletableFuture.completedFuture(null);
+    }
+    messages.add(PulsarUtils.buildPulsarStreamMessage(messageOrNull, _enableKeyValueStitch, _pulsarMetadataExtractor));
+    return fetchNextMessageAndAddToCollection(endMessageId, messages);

Review Comment:
   Can we not use recursion here and do an iterative implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang closed pull request #12061: [feature] use async methods to read from Pulsar topic 
URL: https://github.com/apache/pinot/pull/12061


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]

Posted by "JeffBolle (via GitHub)" <gi...@apache.org>.
JeffBolle commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1855876618

   This PR started from addressing a concurrency issue when interacting with the List used to hold messages pulled from Pulsar after the read timeout expired.  It then evolved into a refactor to use the asynchronous methods of reading from the pulsar topic to get the Future to place the read timeout on, rather than using a single threaded executor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]

Posted by "enzo-dechaene (via GitHub)" <gi...@apache.org>.
enzo-dechaene commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1973221062

   Hii @KKcorps, can we merge this PR in the 1.1.0 release ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1979756517

   There are test failures. @JeffBolle Can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-2065467205

   This no longer apply after #12812 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org