You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "suddendust (via GitHub)" <gi...@apache.org> on 2024/02/29 09:06:53 UTC

[PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

suddendust opened a new pull request, #12522:
URL: https://github.com/apache/pinot/pull/12522

   This PR adds two metrics in `RealtimeSegmentDataManager`:
   1. `REALTIME_LAST_FETCHED_BATCH_SIZE`: This is a new gauge that can assume the following values:
   
   - 0: When there were no messages in the last batch. This can happen when there are no messages or very low vol of messages in the stream partition.
   - 0: When messages were fetched in the last batch.
   - -1: When there were exceptions fetching the message batch from the stream partition.
   
   This metric can be used to identify the 1st case of no messages as:
   
   `sum by (table) (pinot_server_realtimeLastFetchedBatchSizeCount{}) == 0`. We can use this source alert to inhibit other alerts. For example, an alert on realtime ingestion stopped.
   
   **Why can't we reuse `pinot_server_realtimeRowsConsumed`?**
   
   `pinot_server_realtimeRowsConsumed` tracks the number of rows that were successfully indexed. If there were problems transforming/indexing the row, those rows aren't counted in this. So it makes it hard to calculate the total number of rows being fetched from the partition stream.
   
   3. `STREAM_CONSUMER_CREATE_EXCEPTIONS`: If we face exceptions trying to create a stream consumer.
   
   Testing:
   
   `STREAM_CONSUMER_CREATE_EXCEPTIONS`:
   
   <img width="1225" alt="Screenshot 2024-02-29 at 12 30 43 PM" src="https://github.com/apache/pinot/assets/84911643/f9427cfc-85ac-43c4-a40c-b33bd7a88198">
   
   `REALTIME_ROWS_FETCHED`:
   
   <img width="1639" alt="Screenshot 2024-02-29 at 1 34 11 PM" src="https://github.com/apache/pinot/assets/84911643/5fb3020e-f5df-4272-87cc-fb85d5842e4a">
   
   
   <img width="1510" alt="Screenshot 2024-02-29 at 12 35 19 PM" src="https://github.com/apache/pinot/assets/84911643/fa636fcf-e665-479f-8344-09237f20054c">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "suddendust (via GitHub)" <gi...@apache.org>.
suddendust commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1511610005


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   In
   ```
        1.  _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
        2.  closePartitionGroupConsumer();
        3.  _partitionGroupConsumer =
             _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
         _partitionGroupConsumer.start(_currentOffset);
   ```
   If 1st or 2nd throws an exception, 3rd won't be executed - This is a valid case of incrementing this metric, no?
   
   >then the exception is not related to consumer creation.
   
   This metric tracks if a consumer cannot be created for any reason. In this case as well, the consumer isn't created. So we should bump it? 
   
   Earlier, I was swallowing this ingestion which would not be bubbled up any runtime exception up the call stack. Re-throwing addressed that. 
   
   Please correct me if I am wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "gortiz (via GitHub)" <gi...@apache.org>.
gortiz commented on PR #12522:
URL: https://github.com/apache/pinot/pull/12522#issuecomment-1971375101

   Small tip, instead of rate you can also use [increase](https://prometheus.io/docs/prometheus/latest/querying/functions/#increase). I don't think the difference in performance would be important enough to change it, but it may be cleaner what you want to express.
   
   Anyway I think both functions are fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "suddendust (via GitHub)" <gi...@apache.org>.
suddendust commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1507986026


##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -35,12 +35,14 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   DELETED_SEGMENT_COUNT("segments", false),
   DELETE_TABLE_FAILURES("tables", false),
   REALTIME_ROWS_CONSUMED("rows", true),
+  REALTIME_ROWS_FETCHED("rows", false),
   REALTIME_ROWS_FILTERED("rows", false),
   INVALID_REALTIME_ROWS_DROPPED("rows", false),
   INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false),
   REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
   REALTIME_OFFSET_COMMITS("commits", true),
   REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
+  STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false),

Review Comment:
   That code wasn't checked-in, sorry about that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "gortiz (via GitHub)" <gi...@apache.org>.
gortiz commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1510775058


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -427,6 +427,9 @@ protected boolean consumeLoop()
       try {
         messageBatch =
             _partitionGroupConsumer.fetchMessages(_currentOffset, null, _streamConfig.getFetchTimeoutMillis());
+        //track realtime rows fetched on a table level. This included valid + invalid rows
+        _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_FETCHED,
+            messageBatch.getMessageCount());

Review Comment:
   From the interface:
   ```java
     /**
      * @return number of available messages
      */
     int getMessageCount();
   
     /**
      * @return number of messages returned from the stream
      */
     default int getUnfilteredMessageCount() {
       return getMessageCount();
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "zhtaoxiang (via GitHub)" <gi...@apache.org>.
zhtaoxiang commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1507951724


##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -35,12 +35,14 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   DELETED_SEGMENT_COUNT("segments", false),
   DELETE_TABLE_FAILURES("tables", false),
   REALTIME_ROWS_CONSUMED("rows", true),
+  REALTIME_ROWS_FETCHED("rows", false),
   REALTIME_ROWS_FILTERED("rows", false),
   INVALID_REALTIME_ROWS_DROPPED("rows", false),
   INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false),
   REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
   REALTIME_OFFSET_COMMITS("commits", true),
   REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
+  STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false),

Review Comment:
   this newly created metric is not used anywhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "zhtaoxiang (via GitHub)" <gi...@apache.org>.
zhtaoxiang commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1508002971


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   Is it possible that those 2 lines throw exceptions? If possible, then the exception is not related to consumer creation.
   
   Should we only try catch line 1690 and 1691?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "gortiz (via GitHub)" <gi...@apache.org>.
gortiz commented on PR #12522:
URL: https://github.com/apache/pinot/pull/12522#issuecomment-1970846422

   I like the idea, but I'm not sure if the current implementation is correct. Specifically, the scenario that concerns me is the one where:
   - we fetch batches far often than the Prometheus polling rate
   - the source mostly emits non empty batches
   
   In that scenario it seems possible that `REALTIME_LAST_FETCHED_BATCH_SIZE` would be 0 each time Prometheus polls it even if we are actually ingesting.
   
   Instead of being a gauge, we could have something like: `REALTIME_FETCHED_ROWS` which could be a counter. This metric would be non decrement and we could just apply `rate` (or some other operation) to that metric. If it doesn't increase in a while, we can be sure that there are no data in the source and therefore we can skip some alerts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "gortiz (via GitHub)" <gi...@apache.org>.
gortiz commented on PR #12522:
URL: https://github.com/apache/pinot/pull/12522#issuecomment-1977047357

   LGTM (this time for sure! :D). I'll wait a bit in case some other committer can take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "zhtaoxiang (via GitHub)" <gi...@apache.org>.
zhtaoxiang commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1509633595


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   if they throw an exception, is the old consumer closed? if not, the consumer is still alive, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "suddendust (via GitHub)" <gi...@apache.org>.
suddendust commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1511195857


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -427,6 +427,9 @@ protected boolean consumeLoop()
       try {
         messageBatch =
             _partitionGroupConsumer.fetchMessages(_currentOffset, null, _streamConfig.getFetchTimeoutMillis());
+        //track realtime rows fetched on a table level. This included valid + invalid rows
+        _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_FETCHED,
+            messageBatch.getMessageCount());

Review Comment:
   Thanks for pointing this out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "suddendust (via GitHub)" <gi...@apache.org>.
suddendust commented on PR #12522:
URL: https://github.com/apache/pinot/pull/12522#issuecomment-1970950356

   That's a valid case.
   
   >If it doesn't increase in a while, we can be sure that there are no data in the source and therefore we can skip some alerts.
   
   We can't differentiate b/w this (valid) case and when there are exceptions fetching a batch. Although such exceptions can be tracked using `REALTIME_CONSUMPTION_EXCEPTIONS`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "suddendust (via GitHub)" <gi...@apache.org>.
suddendust commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1511610005


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   In
   ```
        1.  _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
        2.  closePartitionGroupConsumer();
        3.  _partitionGroupConsumer =
             _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
         _partitionGroupConsumer.start(_currentOffset);
   ```
   If 1st or 2nd throws an exception, 3rd won't be executed - This is a valid case of incrementing this metric, no?
   
   >then the exception is not related to consumer creation.
   
   This metric tracks if a consumer cannot be created for any reason. In this case as well, the consumer isn't created. So we should bump it?
   
   Please correct me if I am wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "gortiz (via GitHub)" <gi...@apache.org>.
gortiz commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1510767678


##########
docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml:
##########
@@ -77,7 +77,7 @@ rules:
 - pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.realtimeConsumptionExceptions\"><>(\\w+)"
   name: "pinot_server_realtime_consumptionExceptions_$1"
   cache: true
-- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.([^\\.]*?)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+).(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsConsumed)\"><>(\\w+)"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.([^\\.]*?)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+).(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsConsumed|realtimeRowsFetched|streamConsumerCreateExceptions)\"><>(\\w+)"

Review Comment:
   Not related to this PR but... Why do we need to add an enumeration here? I think we should just say _any word_ here to fetch everything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "gortiz (via GitHub)" <gi...@apache.org>.
gortiz commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1510773637


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -427,6 +427,9 @@ protected boolean consumeLoop()
       try {
         messageBatch =
             _partitionGroupConsumer.fetchMessages(_currentOffset, null, _streamConfig.getFetchTimeoutMillis());
+        //track realtime rows fetched on a table level. This included valid + invalid rows
+        _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_FETCHED,
+            messageBatch.getMessageCount());

Review Comment:
   I think this is not correct. It should be `messageBatch.getUnfilteredMessageCount()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "suddendust (via GitHub)" <gi...@apache.org>.
suddendust commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1511267734


##########
docker/images/pinot/etc/jmx_prometheus_javaagent/configs/server.yml:
##########
@@ -77,7 +77,7 @@ rules:
 - pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.realtimeConsumptionExceptions\"><>(\\w+)"
   name: "pinot_server_realtime_consumptionExceptions_$1"
   cache: true
-- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.([^\\.]*?)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+).(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsConsumed)\"><>(\\w+)"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ServerMetrics\", name=\"pinot.server.([^\\.]*?)_(OFFLINE|REALTIME)\\-(.+)\\-(\\w+).(invalidRealtimeRowsDropped|incompleteRealtimeRowsConsumed|rowsWithErrors|realtimeRowsConsumed|realtimeRowsFetched|streamConsumerCreateExceptions)\"><>(\\w+)"

Review Comment:
   Yeah didn't want to add another change to this PR that would've increased the regression surface. This will be fixed in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "suddendust (via GitHub)" <gi...@apache.org>.
suddendust commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1511662491


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   >that means the old consumer is still running, may be it can still be used for consuming data?
   
   I am not sure of the control flow here, if it can be re-used. If it can be reused, then yes, we shouldn't bump the metric. If it can't, then we should.
   
   But look at it this way: We want to be alerted in both cases because the control tried creating a new consumer and hoped using that, but it couldn't for some reason. We def want to look into why we couldn't even close the old consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "gortiz (via GitHub)" <gi...@apache.org>.
gortiz commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1510778929


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   I guess to be sure we should re-throw the exception in the catch part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "zhtaoxiang (via GitHub)" <gi...@apache.org>.
zhtaoxiang commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1511624796


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   If the 1st or 2nd line throws an exception,
   1. that means the old consumer is still running, may be it can still be used for consuming data?
   2. the error is not really about creating consumer, in this case, maybe we don't want to increase the metric, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "zhtaoxiang (via GitHub)" <gi...@apache.org>.
zhtaoxiang commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1511600582


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   can you please help me understand why re-throw the exception can address the potential issue I mentioned above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "xiangfu0 (via GitHub)" <gi...@apache.org>.
xiangfu0 merged PR #12522:
URL: https://github.com/apache/pinot/pull/12522


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "suddendust (via GitHub)" <gi...@apache.org>.
suddendust commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1511610005


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   In
   ```
        1.  _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
        2.  closePartitionGroupConsumer();
        3.  _partitionGroupConsumer =
             _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
         _partitionGroupConsumer.start(_currentOffset);
   ```
   If 1st or 2nd throws an exception, 3rd won't be executed - This is a valid case of incrementing this metric, no?
   
   >then the exception is not related to consumer creation.
   
   This metric tracks if a consumer cannot be created for any reason. In this case as well, the consumer isn't created.
   
   Please correct me if I am wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12522:
URL: https://github.com/apache/pinot/pull/12522#issuecomment-1970764635

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: Patch coverage is `0%` with `86 lines` in your changes are missing coverage. Please review.
   > Project coverage is 0.00%. Comparing base [(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`499222b`)](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 30 commits behind head on master.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [...a/manager/realtime/RealtimeSegmentDataManager.java](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUmVhbHRpbWVTZWdtZW50RGF0YU1hbmFnZXIuamF2YQ==) | 0.00% | [84 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...a/org/apache/pinot/common/metrics/ServerGauge.java](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9TZXJ2ZXJHYXVnZS5qYXZh) | 0.00% | [1 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...a/org/apache/pinot/common/metrics/ServerMeter.java](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9TZXJ2ZXJNZXRlci5qYXZh) | 0.00% | [1 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #12522       +/-   ##
   =============================================
   - Coverage     61.75%    0.00%   -61.76%     
   =============================================
     Files          2436     2374       -62     
     Lines        133233   129759     -3474     
     Branches      20636    20131      -505     
   =============================================
   - Hits          82274        0    -82274     
   - Misses        44911   129759    +84848     
   + Partials       6048        0     -6048     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-0.01%)` | :arrow_down: |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (ø)` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.63%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.75%)` | :arrow_down: |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.76%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12522/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12522?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "ege-st (via GitHub)" <gi...@apache.org>.
ege-st commented on PR #12522:
URL: https://github.com/apache/pinot/pull/12522#issuecomment-1971166374

   This seems like a situation where a histogram would be right choice: it would let us do percentile breakdowns of rows read and I believe it also includes the counter that @gortiz suggested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "gortiz (via GitHub)" <gi...@apache.org>.
gortiz commented on PR #12522:
URL: https://github.com/apache/pinot/pull/12522#issuecomment-1971379244

   LGTM. Unless there is some urgency, I'll wait til tomorrow to merge it just in case other people want to review it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Metrics for Realtime Rows Fetched and Stream Consumer Create Exceptions [pinot]

Posted by "suddendust (via GitHub)" <gi...@apache.org>.
suddendust commented on code in PR #12522:
URL: https://github.com/apache/pinot/pull/12522#discussion_r1508009397


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1661,22 +1666,35 @@ private void makeStreamConsumer(String reason) {
       closePartitionGroupConsumer();
     }
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _partitionGroupConsumer =
+          _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+      _partitionGroupConsumer.start(_currentOffset);
+    } catch (Exception e) {
+      _segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}",
+          _clientId, reason, e);
+      _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
+      throw e;
+    }
   }
 
   /**
    * Checkpoints existing consumer before creating a new consumer instance
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-    _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
-    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-    closePartitionGroupConsumer();
-    _partitionGroupConsumer =
-        _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
-    _partitionGroupConsumer.start(_currentOffset);
+    try {
+      _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason);
+      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+      closePartitionGroupConsumer();

Review Comment:
   If they throw an exception, the consumer wouldn't get created right? And `_currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);` does throw a runtime exception (at least in code).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org