You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "JeffBolle (via GitHub)" <gi...@apache.org> on 2023/11/28 16:01:15 UTC
[PR] [feature] use async methods to read from Pulsar topic [pinot]
JeffBolle opened a new pull request, #12061:
URL: https://github.com/apache/pinot/pull/12061
Addresses ConcurrentModificationException in reading from Pulsar topic and uses fully async calls to read from a Pulsar Reader.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]
Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1859958543
> This PR started from addressing a concurrency issue when interacting with the List used to hold messages pulled from Pulsar after the read timeout expired. It then evolved into a refactor to use the asynchronous methods of reading from the pulsar topic to get the Future to place the read timeout on, rather than using a single threaded executor.
Thanks a lot for contribution! Looks good overall, just have one comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]
Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1830274214
## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12061?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
All modified and coverable lines are covered by tests :white_check_mark:
> Comparison is base [(`a37ced6`)](https://app.codecov.io/gh/apache/pinot/commit/a37ced6ec998aa771a71e6eba7624942b66d34b4?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.61% compared to head [(`ae1d6b4`)](https://app.codecov.io/gh/apache/pinot/pull/12061?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 46.74%.
<details><summary>Additional details and impacted files</summary>
```diff
@@ Coverage Diff @@
## master #12061 +/- ##
=============================================
- Coverage 61.61% 46.74% -14.88%
+ Complexity 1152 943 -209
=============================================
Files 2386 1788 -598
Lines 129579 93953 -35626
Branches 20056 15192 -4864
=============================================
- Hits 79843 43915 -35928
- Misses 43923 46915 +2992
+ Partials 5813 3123 -2690
```
| [Flag](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
|---|---|---|
| [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration1](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration2](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [java-11](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [java-21](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-14.75%)` | :arrow_down: |
| [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-14.74%)` | :arrow_down: |
| [temurin](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-14.88%)` | :arrow_down: |
| [unittests](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-14.88%)` | :arrow_down: |
| [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.74% <ø> (-0.20%)` | :arrow_down: |
| [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12061/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
</details>
[:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12061?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
:loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]
Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on code in PR #12061:
URL: https://github.com/apache/pinot/pull/12061#discussion_r1429804967
##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java:
##########
@@ -75,81 +71,68 @@ public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset,
final MessageId endMessageId =
endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) endMsgOffset).getMessageId();
- List<PulsarStreamMessage> messagesList = new ArrayList<>();
- Future<PulsarMessageBatch> pulsarResultFuture =
- _executorService.submit(() -> fetchMessages(startMessageId, endMessageId, messagesList));
+ final Collection<PulsarStreamMessage> messages = Collections.synchronizedList(new ArrayList<>());
+
+ CompletableFuture<PulsarMessageBatch> pulsarResultFuture = fetchMessagesAsync(startMessageId, endMessageId,
+ messages)
+ .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS)
+ .handle((v, t) -> {
+ if (!(t instanceof TimeoutException)) {
+ LOGGER.warn("Error while fetching records from Pulsar", t);
+ }
+ return new PulsarMessageBatch(buildOffsetFilteringIterable(messages, startMessageId, endMessageId),
+ _enableKeyValueStitch);
+ });
try {
- return pulsarResultFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- // The fetchMessages has thrown an exception. Most common cause is the timeout.
- // We return the records fetched till now along with the next start offset.
- pulsarResultFuture.cancel(true);
- return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId),
- _enableKeyValueStitch);
+ return pulsarResultFuture.get();
} catch (Exception e) {
LOGGER.warn("Error while fetching records from Pulsar", e);
- return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId),
+ return new PulsarMessageBatch(buildOffsetFilteringIterable(messages, startMessageId, endMessageId),
_enableKeyValueStitch);
}
}
- public PulsarMessageBatch fetchMessages(MessageId startMessageId, MessageId endMessageId,
- List<PulsarStreamMessage> messagesList) {
- try {
- _reader.seek(startMessageId);
-
- while (_reader.hasMessageAvailable()) {
- Message<byte[]> nextMessage = _reader.readNext();
-
- if (endMessageId != null) {
- if (nextMessage.getMessageId().compareTo(endMessageId) > 0) {
- break;
- }
- }
- messagesList.add(
- PulsarUtils.buildPulsarStreamMessage(nextMessage, _enableKeyValueStitch, _pulsarMetadataExtractor));
+ public CompletableFuture<Void> fetchMessagesAsync(MessageId startMessageId, MessageId endMessageId,
+ Collection<PulsarStreamMessage> messages) {
+ CompletableFuture<Void> seekFut = _reader.seekAsync(startMessageId);
+ return seekFut.thenCompose((v) -> fetchNextMessageAndAddToCollection(endMessageId, messages));
+ }
- if (Thread.interrupted()) {
- break;
- }
- }
+ public CompletableFuture<Void> fetchNextMessageAndAddToCollection(MessageId endMessageId,
+ Collection<PulsarStreamMessage> messages) {
+ CompletableFuture<Boolean> hasMessagesFut = _reader.hasMessageAvailableAsync();
+ CompletableFuture<Message<byte[]>> messageFut = hasMessagesFut.thenCompose(msgAvailable ->
+ (msgAvailable) ? _reader.readNextAsync() : CompletableFuture.completedFuture(null));
+ CompletableFuture<Void> handleMessageFut = messageFut.thenCompose(messageOrNull ->
+ readMessageAndFetchNextOrComplete(endMessageId, messages, messageOrNull));
+ return handleMessageFut;
+ }
- return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId),
- _enableKeyValueStitch);
- } catch (PulsarClientException e) {
- LOGGER.warn("Error consuming records from Pulsar topic", e);
- return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId),
- _enableKeyValueStitch);
+ public CompletableFuture<Void> readMessageAndFetchNextOrComplete(MessageId endMessageId,
+ Collection<PulsarStreamMessage> messages, Message<byte[]> messageOrNull) {
+ if (messageOrNull == null) {
+ return CompletableFuture.completedFuture(null);
}
+ if (endMessageId != null && messageOrNull.getMessageId().compareTo(endMessageId) > 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ messages.add(PulsarUtils.buildPulsarStreamMessage(messageOrNull, _enableKeyValueStitch, _pulsarMetadataExtractor));
+ return fetchNextMessageAndAddToCollection(endMessageId, messages);
Review Comment:
Can we not use recursion here and do an iterative implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]
Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang closed pull request #12061: [feature] use async methods to read from Pulsar topic
URL: https://github.com/apache/pinot/pull/12061
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]
Posted by "JeffBolle (via GitHub)" <gi...@apache.org>.
JeffBolle commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1855876618
This PR started from addressing a concurrency issue when interacting with the List used to hold messages pulled from Pulsar after the read timeout expired. It then evolved into a refactor to use the asynchronous methods of reading from the pulsar topic to get the Future to place the read timeout on, rather than using a single threaded executor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]
Posted by "enzo-dechaene (via GitHub)" <gi...@apache.org>.
enzo-dechaene commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1973221062
Hii @KKcorps, can we merge this PR in the 1.1.0 release ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]
Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-1979756517
There are test failures. @JeffBolle Can you take a look?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [feature] use async methods to read from Pulsar topic [pinot]
Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #12061:
URL: https://github.com/apache/pinot/pull/12061#issuecomment-2065467205
This no longer apply after #12812
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org