You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/13 06:55:29 UTC

[GitHub] [pinot] KKcorps commented on a diff in pull request #9893: Pulsar Connection handler should not spin up a consumer / reader

KKcorps commented on code in PR #9893:
URL: https://github.com/apache/pinot/pull/9893#discussion_r1046721249


##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java:
##########
@@ -46,11 +47,18 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
     implements PartitionGroupConsumer {
   private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
   private final ExecutorService _executorService;
+  private final Reader _reader;
   private boolean _enableKeyValueStitch = false;
 
   public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig,
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
-    super(clientId, streamConfig, partitionGroupConsumptionStatus.getPartitionGroupId());
+    super(clientId, streamConfig);
+    PulsarConfig config = new PulsarConfig(streamConfig, clientId);
+    _reader = createReaderForPartition(config.getPulsarTopicName(),

Review Comment:
   We should also close `_reader` in `PulsarPartitionLevelConsumer.close` method



##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java:
##########
@@ -98,8 +100,8 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset
       }
       return new MessageIdStreamOffset(offset);
     } catch (PulsarClientException e) {
-      LOGGER.error("Cannot fetch offsets for partition " + _partition + " and topic " + _topic + " and offsetCriteria "
-          + offsetCriteria, e);
+      LOGGER.error("Cannot fetch offsets for partition " + _partition + " and topic " + _topic

Review Comment:
   We can revert this formatting change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org