You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/03/11 22:26:21 UTC

[GitHub] [pinot] npawar commented on a change in pull request #8017: Fetch Pulsar offsets from Consumer interface instead of Reader

npawar commented on a change in pull request #8017:
URL: https://github.com/apache/pinot/pull/8017#discussion_r825154864



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -124,19 +127,28 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset
 
         for (int p = newPartitionStartIndex; p < partitionedTopicNameList.size(); p++) {
 
-          Reader reader =
-              _pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
-                  .create();
+          Consumer consumer = _pulsarClient.newConsumer().topic(getPartitionedTopicName(p))

Review comment:
       getPartitionedTopicName is making a call to `_pulsarClient.getPartitionsForTopic(_topic).get()` for every partition. Can we call it just once before the loop?

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -124,19 +127,28 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset
 
         for (int p = newPartitionStartIndex; p < partitionedTopicNameList.size(); p++) {
 
-          Reader reader =
-              _pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
-                  .create();
+          Consumer consumer = _pulsarClient.newConsumer().topic(getPartitionedTopicName(p))

Review comment:
       same as above, close every new consumer?

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -80,16 +84,15 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset
     Preconditions.checkNotNull(offsetCriteria);
     try {
       MessageId offset = null;
+      Consumer consumer =

Review comment:
       we're reading earliest/largest in so many places now..
   In `PulsarConfig`:
   Based on `OffsetCriteria` smallest/largest/custom, we get 
   `InitialMessageID` earliest/latest/custom
   Based on `InitialMessageID`, we get 
   `SubscriberInitialPosition` Earliest/Latest/Latest.
   Is it okay to have `SubscriberInitialPosition`=Latest for a custom OffsetCriteria? Should we even keep custom offset criteria in Pulsar? I see in `fetchStreamPartitionOffset` we don't support it.
   
   Further, in `PulsarStreamMetadataProvider#fetchStreamPartitionOffset`, the Consumer is created based on `SubscriberInitialPosition`. But based on the `offsetCriteria` param in that method, we make the consumer `getLastMessage` or `receive` message. What if `PulsarConfig` had `largest` `OffsetCriteria`, but the param in this method has `smallest`?

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -124,19 +127,28 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset
 
         for (int p = newPartitionStartIndex; p < partitionedTopicNameList.size(); p++) {
 
-          Reader reader =
-              _pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
-                  .create();
+          Consumer consumer = _pulsarClient.newConsumer().topic(getPartitionedTopicName(p))
+              .subscriptionInitialPosition(_config.getInitialSubscriberPosition())
+              .subscriptionName(ConsumerName.generateRandomName()).subscribe();
 
-          if (reader.hasMessageAvailable()) {
-            Message message = reader.readNext();
+          Message message = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS);

Review comment:
       maybe add a comment here, that the offsetCriteria from StreamConfig param supplied to this method is ignored on purpose when deciding the offset, and that purposely "earliest" is used for all ew partitions

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -80,16 +84,15 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset
     Preconditions.checkNotNull(offsetCriteria);
     try {
       MessageId offset = null;
+      Consumer consumer =
+          _pulsarClient.newConsumer().topic(_topic)
+              .subscriptionInitialPosition(_config.getInitialSubscriberPosition())
+              .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe();
+
       if (offsetCriteria.isLargest()) {
-        _reader.seek(MessageId.latest);
-        if (_reader.hasMessageAvailable()) {
-          offset = _reader.readNext().getMessageId();
-        }
+        offset = consumer.getLastMessageId();
       } else if (offsetCriteria.isSmallest()) {
-        _reader.seek(MessageId.earliest);
-        if (_reader.hasMessageAvailable()) {
-          offset = _reader.readNext().getMessageId();
-        }
+        offset = consumer.receive().getMessageId();

Review comment:
       you prolly need to close the Consumer at the end?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org