You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "vrajat (via GitHub)" <gi...@apache.org> on 2023/12/15 10:34:13 UTC

[PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

vrajat opened a new pull request, #12157:
URL: https://github.com/apache/pinot/pull/12157

   Pinot may take multiple hours between polling a partition in a Kafka topic. One specific example is that Pinot took a long time to flush a segment to disk. In the meantime, messages in Kafka can expire if message retention time is small. 
   If `auto.offset.reset` is set to `smallest`, then Kafka will silently move the offset to the first available message leading to data loss. 
   Before consuming messages from Kafka, check if any messages have expired by comparing the `startOffset` in the `RealtimeSegmentDataManager` to the `beginOffset` of the Kafka partition. If `startOffset` < `beginOffset`, then log the condition as well as set guage to 1. The guage can be connected to an alerting system.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473780835


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
     _partitionGroupConsumer =
         _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+    try {
+      _partitionGroupConsumer.validateStreamState(_currentOffset);
+      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,

Review Comment:
   Going back on this as well. Metrics/alerts per partition is not allowed anymore. So the metric/alert is for a table. Logs have more information. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1429452785


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -46,6 +50,20 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i
     super(clientId, streamConfig, partition);
   }
 
+  @Override
+  public void validateStreamState(StreamPartitionMsgOffset startMsgOffset) throws PermanentConsumerException {
+    final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
+    Map<TopicPartition, Long> beginningOffsets =
+            _consumer.beginningOffsets(Collections.singletonList(_topicPartition));
+
+    final long beginningOffset = beginningOffsets.getOrDefault(_topicPartition, 0L);
+    if (startOffset < beginningOffset) {

Review Comment:
   Yes. That is the common scenario. [KafkaConsumer::position](https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#position(org.apache.kafka.common.TopicPartition)) specifies the next record that is somewhere in the middle or end of the stream. Also `startOffset` is usually the same as `KafkaConsumer::position` if the consumer is keeping up.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "ege-st (via GitHub)" <gi...@apache.org>.
ege-st commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1861814766

   > > We should be able to emit metric within `PartitionGroupConsumer.fetchMessages()` when the start offset is not available (e.g. the asked offset range is not fully returned).
   > 
   > @Jackie-Jiang Are you suggesting to run this check in in `RealtimeSegmentDataManager::consumeLoop` effectively instead of when `PartitionGroupConsumer::start` is called? Will that cause too many checks?
   > 
   > One doubt I have is whether the consumer may fall behind even after the consumer was started. In that case it is better to move the check to `consumeLoop` or `fetchMessages`.
   
   I don't think there will be a noticeable performance impact if the check is done in `fetchMessages` or `consumeLoop` and executed for each message batch. The performance cost should be relatively small compared to everything else that we do in the `consumeLoop`. But it would be awesome to do a performance test just to make sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat closed pull request #12157: Detect expired messages in Kafka. Log and set a guage.  
URL: https://github.com/apache/pinot/pull/12157


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1882746729

   I've added `RealtimeSlowConsumer.java` as a temporary reproducer while I work on an integration test. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1981792537

   Debug APIs and tests are there to test that the data loss is detected. Is it OK if one of the PRs has no test to check the changes? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "ege-st (via GitHub)" <gi...@apache.org>.
ege-st commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1861811576

   It is possible for messages to get expired in the middle of the lifetime of a consuming segment. One way this could happen is if the ingestion on the table is paused for a period longer than the Source's retention, when ingestion is resumed it would lead to a gap in messages within the segment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1434819332


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
     _partitionGroupConsumer =
         _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+    try {
+      _partitionGroupConsumer.validateStreamState(_currentOffset);
+      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,

Review Comment:
   The gauge for a specific partition. `setValueOfParitionGauge` takes a partition id as one of the parameters. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473761868


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -889,6 +892,39 @@ public Map<String, PartitionLagState> getPartitionToLagState(
     return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
+  /**
+   * Checks if the stream partition is in a valid state.
+   *
+   * The type of checks is dependent on the stream type. An example is if the startOffset has expired due to
+   * retention configuration of the stream which may lead to missed data.
+   *
+   * @param startOffset The offset of the first message desired, inclusive
+   */
+  private void validateStartOffset(StreamPartitionMsgOffset startOffset) {
+    if (_partitionMetadataProvider == null) {
+      createPartitionMetadataProvider("validateStartOffset");
+    }
+
+    try {
+      StreamPartitionMsgOffset streamSmallestOffset = _partitionMetadataProvider.fetchStreamPartitionOffset(
+          OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
+          /*maxWaitTimeMs=*/5000
+      );
+      if (streamSmallestOffset.compareTo(startOffset) > 0) {

Review Comment:
   If ingestion is paused and the stream fast-forwards the begin offset, then the data loss error is raised. Actually - this how I simulate this scenario in the integration test.
   
   For my knowledge, is it possible to specify an offset when resuming the ingestion? If yes, and if the given offset < the beginOffset, the error will be raised. However that is OK ? The user expected to get messages from a specific offset and didnt. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473772323


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1711,4 +1725,14 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo
   URI createSegmentPath(String rawTableName, String segmentName) {
     return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
   }
+
+  public Map<String, SegmentErrorInfo> getSegmentErrors(String tableNameWithType) {

Review Comment:
   Hmm - I was following the example of `protected LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;` but the same constraint is not here. I'll change it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1867294975

   A simple script to show message expiration in kafka
   
   Assume all the components are installed
   - Kafka
   - Kafka CLIs
   - Faker
   
   ```
   # Create a topic with retention of 1m
   kafka_2.13-3.6.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --config retention.ms=60000 --topic cli_profiles
   
   # Produce 5 rows
   faker -r 5 profile | nl -w2 -v 10 -s':' | ./kafka_2.13-3.6.1/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --property parse.key=true --property key.separator=":" --topic cli_profile
   
   # Example row:
   1:{'job': 'Health service manager', 'company': 'Wall, Ballard and Anderson', 'ssn': '729-80-0324', 'residence': '42764 Porter Mountains\nStephenborough, NV 63641', 'current_location': (Decimal('56.096589'), Decimal('-156.143961')), 'blood_group': 'AB+', 'website': ['http://serrano.org/', 'https://huerta-kidd.com/'], 'username': 'martinezchristopher', 'name': 'Linda Taylor', 'sex': 'F', 'address': '0509 Angela Road\nEast Sandraport, MP 39203', 'mail': 'regina54@gmail.com', 'birthdate': datetime.date(1923, 9, 8)}
   
   # Consume rows after a sleep of 90 seconds. None of these rows should be available.
   # kafka-console-consumer.sh is in wait state without printing any rows.
   sleep 90
   ./kafka_2.13-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --property key.separator=":" --topic cli_profile
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "swaminathanmanish (via GitHub)" <gi...@apache.org>.
swaminathanmanish commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1861764515

   > > We should be able to emit metric within `PartitionGroupConsumer.fetchMessages()` when the start offset is not available (e.g. the asked offset range is not fully returned).
   > 
   > @Jackie-Jiang Are you suggesting to run this check in in `RealtimeSegmentDataManager::consumeLoop` effectively instead of when `PartitionGroupConsumer::start` is called? Will that cause too many checks?
   > 
   > One doubt I have is whether the consumer may fall behind even after the consumer was started. In that case it is better to move the check to `consumeLoop` or `fetchMessages`.
   
   Yes I think its possible that records in kafka can expire while consumeLoop is consuming from stream (beyond start phase). Filling a realtime segment can take several hours during which records may expire. 
   
   @Jackie-Jiang - Is your intention to add the check to `PartitionGroupConsumer.fetchMessages()` for completeness of the check as well as reuse across stream fetchers ? 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "ege-st (via GitHub)" <gi...@apache.org>.
ege-st commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1450846307


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -46,6 +50,20 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i
     super(clientId, streamConfig, partition);
   }
 
+  @Override
+  public void validateStreamState(StreamPartitionMsgOffset startMsgOffset) throws PermanentConsumerException {
+    final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
+    Map<TopicPartition, Long> beginningOffsets =
+            _consumer.beginningOffsets(Collections.singletonList(_topicPartition));
+
+    final long beginningOffset = beginningOffsets.getOrDefault(_topicPartition, 0L);
+    if (startOffset < beginningOffset) {

Review Comment:
   Coming back to this discussion, I realized my confusion: in this case Beginning Offset is the earliest offset in the Kafka topic and Start Offset is where the new Consuming Segment will start reading from.  
   
   I had been thinking in terms of the last offset in the immediately preceding segment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473755477


##########
pinot-tools/src/main/java/org/apache/pinot/tools/RealTimeSlowConsumer.java:
##########
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
+
+
+public class RealTimeSlowConsumer extends QuickStartBase {
+    private static final String DEFAULT_CONTROLLER_URL = "http://localhost:9000";
+
+    @Override
+    protected Map<String, String> getDefaultStreamTableDirectories() {
+        return ImmutableMap.<String, String>builder()

Review Comment:
   More existential question: Should I keep this quickstart or discard it? It is helpful to play around for this specific scenario but I dont see a use for it beyond that. I do have an integration test. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473759897


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -889,6 +892,39 @@ public Map<String, PartitionLagState> getPartitionToLagState(
     return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
+  /**
+   * Checks if the stream partition is in a valid state.
+   *
+   * The type of checks is dependent on the stream type. An example is if the startOffset has expired due to
+   * retention configuration of the stream which may lead to missed data.
+   *
+   * @param startOffset The offset of the first message desired, inclusive
+   */
+  private void validateStartOffset(StreamPartitionMsgOffset startOffset) {
+    if (_partitionMetadataProvider == null) {
+      createPartitionMetadataProvider("validateStartOffset");
+    }
+
+    try {
+      StreamPartitionMsgOffset streamSmallestOffset = _partitionMetadataProvider.fetchStreamPartitionOffset(
+          OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
+          /*maxWaitTimeMs=*/5000
+      );
+      if (streamSmallestOffset.compareTo(startOffset) > 0) {
+        _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L);
+        String message = "startOffset(" + startOffset
+            + ") is older than topic's beginning offset(" + streamSmallestOffset + ")";
+        _segmentLogger.error(message);
+        _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+            new SegmentErrorInfo(String.valueOf(now()), message, "")

Review Comment:
   Made this change locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "ege-st (via GitHub)" <gi...@apache.org>.
ege-st commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1428220886


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -46,6 +50,20 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i
     super(clientId, streamConfig, partition);
   }
 
+  @Override
+  public void validateStreamState(StreamPartitionMsgOffset startMsgOffset) throws PermanentConsumerException {
+    final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
+    Map<TopicPartition, Long> beginningOffsets =
+            _consumer.beginningOffsets(Collections.singletonList(_topicPartition));
+
+    final long beginningOffset = beginningOffsets.getOrDefault(_topicPartition, 0L);
+    if (startOffset < beginningOffset) {

Review Comment:
   Is `startOffset > beginningOffset` an acceptable scenario?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
     _partitionGroupConsumer =
         _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+    try {
+      _partitionGroupConsumer.validateStreamState(_currentOffset);
+      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,

Review Comment:
   So it's good that we have a gauge per table (because otherwise, a table which is healthy could reset the gauge from `1` to `0` if it executes this check _after_ an unhealthy table set the gauge to `1`).
   
   However, what happens if there are multiple partitions on a table and one partition's segment sets this gauge to `1`, then a different partition's segment executes this code and resets the gauge from `1` to `0`? Put another way: if a table has 3 partitions and 1 of them fails `validateStreamState` but the other two pass `validateStreamState` then it's very likely that the healthy partitions will mask the unhealthy partition.
   
   We need to make sure that if there is a segment that fails this validation this gauge is set to `1` until that segment is fixed or there is some other manual intervention.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
     _partitionGroupConsumer =
         _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+    try {
+      _partitionGroupConsumer.validateStreamState(_currentOffset);
+      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,
+              ServerGauge.INVALID_REALTIME_STREAM_STATE_EXCEPTION, 0);
+    } catch (PermanentConsumerException pce) {

Review Comment:
   Should we catch this exception here or bubble it up to the next level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473760338


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1339,7 +1346,14 @@ private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria
         LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffsetInSegmentZkMetadata,
             streamSmallestOffset, partitionGroupId, tableName);
         _controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
-        return streamSmallestOffset;
+        String message = "Data lost from offset: " + startOffsetInSegmentZkMetadata
+                        + " to: " + streamSmallestOffset
+                        + " for partition: " + partitionGroupId
+                        + " of table: " + tableName;
+
+        _errorCache.put(Pair.of(tableName, segmentName),
+            new SegmentErrorInfo(String.valueOf(System.currentTimeMillis()), message, ""));

Review Comment:
   Made this change locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473773112


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1711,4 +1725,14 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo
   URI createSegmentPath(String rawTableName, String segmentName) {
     return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
   }
+
+  public Map<String, SegmentErrorInfo> getSegmentErrors(String tableNameWithType) {

Review Comment:
   One issue with a 2-level map is that it is harder to control the size. Getting the total size of the errors will require iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1986549031

   This PR is superseded by #12608 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1981974328

   Ideally we want a unit test to directly read the value from the gauge, instead of relying on a rest API which is kind of an integration test. Currently I saw CI test failures, but not sure which part has caused it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "ege-st (via GitHub)" <gi...@apache.org>.
ege-st commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1889843108

   So to make sure I understand: we'll essentially have a gauge per partition per table. And if a new consuming segment is started where the start offset < earliest offset still in Kafka the Gauge will be set to `1` for that partition of that table.  
   
   That gauge will remain at `1` until the next time a consuming segment is created and it will be set to `0` if start offset >= the earliest offset in Kafka.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1859525432

   > We should be able to emit metric within `PartitionGroupConsumer.fetchMessages()` when the start offset is not available (e.g. the asked offset range is not fully returned).
   
   @Jackie-Jiang Are you suggesting to run this check in in `RealtimeSegmentDataManager::consumeLoop` effectively instead of when `PartitionGroupConsumer::start` is called? Will that cause too many checks? 
   
   One doubt I have is whether the consumer may fall behind even after the consumer was started. In that case it is better to move the check to `consumeLoop` or `fetchMessages`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1863422625

   The overhead should be minimal if we don't need to make extra remote calls. I'd assume `PartitionGroupConsumer.fetchMessages()` already have the information we needed to perform the check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1891440840

   > So to make sure I understand: we'll essentially have a gauge per partition per table. And if a new consuming segment is started where the start offset < earliest offset still in Kafka the Gauge will be set to `1` for that partition of that table.
   > 
   > That gauge will remain at `1` until the next time a consuming segment is created and it will be set to `0` if start offset >= the earliest offset in Kafka.
   
   good questions here and we should discuss what is the right thing to do. 
   * I have changed the code to a metric per table. As part of alert clean up, SRE team has requested to remove per partition alerts. So the on-call has to refer to logs to get the lagging partitions. We are also changing existing alerts to be table specific.
   * Another important question is if ingestion should stop. One worry is that stopping ingestion may make data loss worse as more messages may expire.
   * If ingestion continues, then when and how should the metric be reset? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1434819785


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
     _partitionGroupConsumer =
         _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+    try {
+      _partitionGroupConsumer.validateStreamState(_currentOffset);
+      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,
+              ServerGauge.INVALID_REALTIME_STREAM_STATE_EXCEPTION, 0);
+    } catch (PermanentConsumerException pce) {

Review Comment:
   I wasnt sure if the ingest should be stopped. After a couple of conversations the feedback is that it should be stopped. I'll bubble up the exception. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473771585


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -140,6 +142,8 @@ public class PinotLLCRealtimeSegmentManager {
   // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
   private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
 
+  private Map<Pair<String, String>, SegmentErrorInfo> _errorCache;

Review Comment:
   I should limit its size. I am thinking of using a `LinkedHashMap` to limit its entry. Similar to https://docs.oracle.com/javase/8/docs/api/java/util/LinkedHashMap.html#removeEldestEntry-java.util.Map.Entry-
   
   Is that OK? 
   
   However, these methods are not synchronized. I can use [Collections.synchronizedMap](https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#synchronizedMap-java.util.Map-) to wrap the LinkedHashMap. 
   
   Also another minor improvement is to change the signature to `getSegmentErrors(String tableName, String segmentName)`. Then iterators and streams are not required and all the access will be through `get` which is synchronized. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473780427


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
     _segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", _clientId, reason);
     _partitionGroupConsumer =
         _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus);
+    try {
+      _partitionGroupConsumer.validateStreamState(_currentOffset);
+      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,
+              ServerGauge.INVALID_REALTIME_STREAM_STATE_EXCEPTION, 0);
+    } catch (PermanentConsumerException pce) {

Review Comment:
   Going back on this. On more discussion, it is a bad idea to stop ingestion as that will cause more data loss. So the decision right now is to alert but not stop. Also not that the same condition is checked in `RealtimeDataValidationManager` and that increases a meter but does not attempt to stop. So there is a precedent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1857759620

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12157?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: `9 lines` in your changes are missing coverage. Please review.
   > Comparison is base [(`d38e15d`)](https://app.codecov.io/gh/apache/pinot/commit/d38e15d1fabe4aa213513f919b719f3249b259d3?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.61% compared to head [(`962eabf`)](https://app.codecov.io/gh/apache/pinot/pull/12157?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 46.50%.
   > Report is 8 commits behind head on master.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/12157?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [...a/manager/realtime/RealtimeSegmentDataManager.java](https://app.codecov.io/gh/apache/pinot/pull/12157?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUmVhbHRpbWVTZWdtZW50RGF0YU1hbmFnZXIuamF2YQ==) | 25.00% | [9 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12157?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #12157       +/-   ##
   =============================================
   - Coverage     61.61%   46.50%   -15.12%     
   + Complexity     1153      946      -207     
   =============================================
     Files          2407     1809      -598     
     Lines        130922    95218    -35704     
     Branches      20223    15350     -4873     
   =============================================
   - Hits          80669    44280    -36389     
   - Misses        44366    47795     +3429     
   + Partials       5887     3143     -2744     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.50% <35.71%> (-15.00%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.46% <35.71%> (-15.15%)` | :arrow_down: |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.48% <35.71%> (-14.98%)` | :arrow_down: |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.50% <35.71%> (-15.12%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.50% <35.71%> (-15.11%)` | :arrow_down: |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.50% <35.71%> (-0.17%)` | :arrow_down: |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12157/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12157?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1471662750


##########
pinot-tools/src/main/java/org/apache/pinot/tools/RealTimeSlowConsumer.java:
##########
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.utils.KafkaStarterUtils;
+
+
+public class RealTimeSlowConsumer extends QuickStartBase {
+    private static final String DEFAULT_CONTROLLER_URL = "http://localhost:9000";
+
+    @Override
+    protected Map<String, String> getDefaultStreamTableDirectories() {
+        return ImmutableMap.<String, String>builder()

Review Comment:
   The indentation is incorrect. Please apply [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide) and reformat the changes



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -889,6 +892,39 @@ public Map<String, PartitionLagState> getPartitionToLagState(
     return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
+  /**
+   * Checks if the stream partition is in a valid state.
+   *
+   * The type of checks is dependent on the stream type. An example is if the startOffset has expired due to
+   * retention configuration of the stream which may lead to missed data.
+   *
+   * @param startOffset The offset of the first message desired, inclusive
+   */
+  private void validateStartOffset(StreamPartitionMsgOffset startOffset) {
+    if (_partitionMetadataProvider == null) {
+      createPartitionMetadataProvider("validateStartOffset");
+    }
+
+    try {
+      StreamPartitionMsgOffset streamSmallestOffset = _partitionMetadataProvider.fetchStreamPartitionOffset(
+          OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
+          /*maxWaitTimeMs=*/5000
+      );

Review Comment:
   Use `fetchEarliestStreamOffset(5000)`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1339,7 +1346,14 @@ private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria
         LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffsetInSegmentZkMetadata,
             streamSmallestOffset, partitionGroupId, tableName);
         _controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
-        return streamSmallestOffset;
+        String message = "Data lost from offset: " + startOffsetInSegmentZkMetadata
+                        + " to: " + streamSmallestOffset
+                        + " for partition: " + partitionGroupId
+                        + " of table: " + tableName;
+
+        _errorCache.put(Pair.of(tableName, segmentName),
+            new SegmentErrorInfo(String.valueOf(System.currentTimeMillis()), message, ""));

Review Comment:
   ```suggestion
               new SegmentErrorInfo(System.currentTimeMillis()), message, null));
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1711,4 +1725,14 @@ String moveSegmentFile(String rawTableName, String segmentName, String segmentLo
   URI createSegmentPath(String rawTableName, String segmentName) {
     return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
   }
+
+  public Map<String, SegmentErrorInfo> getSegmentErrors(String tableNameWithType) {

Review Comment:
   Why not storing a 2 level map: `Map<String, Map<String, SegmentErrorInfo>>`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -140,6 +142,8 @@ public class PinotLLCRealtimeSegmentManager {
   // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
   private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
 
+  private Map<Pair<String, String>, SegmentErrorInfo> _errorCache;

Review Comment:
   This is not really a cache because we will never clean up the entries as of now. Do you plan to add the cleanup logic in this PR? If not, let's add a TODO at least



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -889,6 +892,39 @@ public Map<String, PartitionLagState> getPartitionToLagState(
     return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
+  /**
+   * Checks if the stream partition is in a valid state.
+   *
+   * The type of checks is dependent on the stream type. An example is if the startOffset has expired due to
+   * retention configuration of the stream which may lead to missed data.
+   *
+   * @param startOffset The offset of the first message desired, inclusive
+   */
+  private void validateStartOffset(StreamPartitionMsgOffset startOffset) {
+    if (_partitionMetadataProvider == null) {
+      createPartitionMetadataProvider("validateStartOffset");
+    }
+
+    try {
+      StreamPartitionMsgOffset streamSmallestOffset = _partitionMetadataProvider.fetchStreamPartitionOffset(
+          OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
+          /*maxWaitTimeMs=*/5000
+      );
+      if (streamSmallestOffset.compareTo(startOffset) > 0) {
+        _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L);
+        String message = "startOffset(" + startOffset
+            + ") is older than topic's beginning offset(" + streamSmallestOffset + ")";
+        _segmentLogger.error(message);
+        _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+            new SegmentErrorInfo(String.valueOf(now()), message, "")
+        );
+      }
+    } catch (TimeoutException tce) {

Review Comment:
   We should catch all exceptions to avoid this check breaking the ingestion (Kinesis doesn't support this method at all). You may use `fetchEarliestStreamOffset()` which already handles this



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -889,6 +892,39 @@ public Map<String, PartitionLagState> getPartitionToLagState(
     return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
+  /**
+   * Checks if the stream partition is in a valid state.
+   *
+   * The type of checks is dependent on the stream type. An example is if the startOffset has expired due to
+   * retention configuration of the stream which may lead to missed data.
+   *
+   * @param startOffset The offset of the first message desired, inclusive
+   */
+  private void validateStartOffset(StreamPartitionMsgOffset startOffset) {
+    if (_partitionMetadataProvider == null) {
+      createPartitionMetadataProvider("validateStartOffset");
+    }
+
+    try {
+      StreamPartitionMsgOffset streamSmallestOffset = _partitionMetadataProvider.fetchStreamPartitionOffset(
+          OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
+          /*maxWaitTimeMs=*/5000
+      );
+      if (streamSmallestOffset.compareTo(startOffset) > 0) {
+        _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L);
+        String message = "startOffset(" + startOffset
+            + ") is older than topic's beginning offset(" + streamSmallestOffset + ")";
+        _segmentLogger.error(message);
+        _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+            new SegmentErrorInfo(String.valueOf(now()), message, "")

Review Comment:
   ```suggestion
               new SegmentErrorInfo(now()), message, null)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1914824597

   @Jackie-Jiang @ege-st Can you please re-review this PR? The major changes since the last changes are:
   1. Cleaned up the check in `RealtimeSegmentDataManager` to reuse member objects to check offsets.
   2. Report validation errors from both `RealtimeSegmentDataManager` and `RealtimeSegmentValidationManager` to Debug APIs.
   3. Write a test that simulates this scenario. It also waits for Kafka to fast forward and the error messages to appear in the debug info. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473771585


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -140,6 +142,8 @@ public class PinotLLCRealtimeSegmentManager {
   // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
   private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
 
+  private Map<Pair<String, String>, SegmentErrorInfo> _errorCache;

Review Comment:
   I should limit its size. I am thinking of using a `LinkedHashMap` to limit its size. Similar to https://docs.oracle.com/javase/8/docs/api/java/util/LinkedHashMap.html#removeEldestEntry-java.util.Map.Entry-
   
   Is that OK? 
   
   However, these methods are not synchronized. I can use [Collections.synchronizedMap](https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#synchronizedMap-java.util.Map-) to wrap the LinkedHashMap. 
   
   Also another minor improvement is to change the signature to `getSegmentErrors(String tableName, String segmentName)`. Then iterators and streams are not required and all the access will be through `get` which is synchronized. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "ege-st (via GitHub)" <gi...@apache.org>.
ege-st commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1474666166


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -140,6 +142,8 @@ public class PinotLLCRealtimeSegmentManager {
   // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
   private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
 
+  private Map<Pair<String, String>, SegmentErrorInfo> _errorCache;

Review Comment:
   Just double checking my understanding of the error cache.  It's a map from each (table, segment) pair that is on this server to the most recent error message that was seen for that table/segment? In other words, for each server, we'll see the most recent error on each segment on that server. 
   
   1. Longer term is how to manage noisy errors vs not-noisy errors.  For example: if there's an error with missing offsets (which you're monitoring for in this PR) and a decoding error on 1/5 messages, the decoding error will flood the cache and block out the Offset Error from being seen.
   2. What happens when a table/segment is deleted or moved? The error cache will still have the non-existent segments and provide invalid information.  We have this issue with Ingestion Lag metrics and it's frequently causing false alerts and issues. If this happens multiple times then we can wind up with many servers reporting errors for the same segment which will be confusing during investigations.
   3. If you limit the size of this map, then it still needs to support all the extant segments that are on a server: so I'm not sure setting a fixed limit will work b/c how many segments a single server can have is not, so far as I know, strictly limited. So how can we determine what the max size should be?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

Posted by "vrajat (via GitHub)" <gi...@apache.org>.
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1475535154


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -140,6 +142,8 @@ public class PinotLLCRealtimeSegmentManager {
   // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
   private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
 
+  private Map<Pair<String, String>, SegmentErrorInfo> _errorCache;

Review Comment:
   One point is that this is for debug APIs only. Errors from this list is not exposed to the user. The official user interface is metrics and logs. Based on the fact that the error list is for debugging only:
   
   
   1.  High error rate: Limiting the size is a good idea so that it doesnt use up memory. Losing errors is OK as this is not the source of truth and only for debugging. Right now there is only one error source. If there are too many, the important aspect is that a data loss occurred.
   2. Segment lifecycle: Since it is not the source of truth and used by devs only, mismatch is OK. The main contribution of the PR is the metric to track data loss and the log with all the necessary info. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org