You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "ryucc (via GitHub)" <gi...@apache.org> on 2023/06/22 21:14:07 UTC

[GitHub] [samza] ryucc opened a new pull request, #1674: Close SystemConsumer properly

ryucc opened a new pull request, #1674:
URL: https://github.com/apache/samza/pull/1674

   # Changes
   KafkaCheckpointManager is a reused class. On reuse, the assumption that `taskNamesToCheckpoints == null` may not be true. This is causing the `SystemConsumer` left unclosed, and a memory leak.
   
   There are 2 fixes.
   1. Adding a patch to close SystemConsumer properly
   2. Setting taskNamesToCheckpoints back to `null` on `stop()`
   
   We are choosing solution 1., since the `stopConsumerAfterFirstRead ` option expects only 1 read per task(?). 
   
   # Issue
   [SAMZA-2785](https://issues.apache.org/jira/browse/SAMZA-2785)
   
   Calling start() and stop() multiple times on the same KafkaCheckpointManager, while stopConsumerAfterFirstRead == true, causes the SystemConsumer left unclosed. The unclosed SystemConsumer can cause memory leaks in some implementations.
   
   
   Evidence:
   
   In production logs, SystemConsumer was started 1741 times, but only closed 14 times.
   
   [katxxxx@xxxxxx xxxx]$ grep "Starting the checkpoint SystemConsumer from oldest offset" xxxxxx.log|wc -l
   1741
   [katxxxxx@xxxxxx xxxx]$ grep "Stopping system consumer" xxxxxxxxxx.log |wc -l
   14 
   We also have a heap dump of KafkaSystemConsumer taking up 8Gbs of memory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ryucc commented on pull request #1674: Close SystemConsumer properly

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on PR #1674:
URL: https://github.com/apache/samza/pull/1674#issuecomment-1603384515

   Then in this case, new `SystemConsumer`s shouldn't be created on KafkaCheckpointManager.start().
   
   Observing the logs in another way, new `SystemConsumer`s are started, but never used to `readCheckpoints`. They are also never closed, and we see them accumulating in the heap.
   
   This opens a 3rd solution.
   ```
     override def start(): Unit = {
        ...
        if(!systemConsumerStarted) {
            systemConsumer.start()
            systemConsumerStarted=true
        }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on pull request #1674: Close SystemConsumer properly

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on PR #1674:
URL: https://github.com/apache/samza/pull/1674#issuecomment-1603369836

   > > I don't think there is any problem with the existing code. For active containers, the expected behavior is to keep the `KafkaCheckpointManager` started until the entire container is shutdown which would trigger stop.
   > 
   > This PR doesn't change when `KafkaCheckpointManager` is shutdown. The PR is trying to address that when `KafkaCheckpointManager` is shutdown, `SystemConsumer` is left open.
   
   I meant `SystemConsumer` shutdown. 
   
   For standalone, the containers are recreated on every rebalance. It is possible that the previous attempts to shutdown the container failed and perhaps, that is being ignored in the application


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on pull request #1674: Close SystemConsumer properly

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on PR #1674:
URL: https://github.com/apache/samza/pull/1674#issuecomment-1603361792

   I don't think there is any problem with the existing code. For active containers, the expected behavior is to keep the `KafkaCheckpointManager` started until the entire container is shutdown which would trigger stop.
   
   On the other hand, for standby containers, there is need to only read checkpoint once and hence closes right after.
   
   I'd trace it back to see if there is any violation in how this flag is set and assumptions that this flag is built on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ryucc commented on pull request #1674: Close SystemConsumer properly

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on PR #1674:
URL: https://github.com/apache/samza/pull/1674#issuecomment-1603368766

   Somehow the `KafkaCheckpointManager` is started/stopped multiple times in standalone mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ryucc commented on pull request #1674: Close SystemConsumer properly

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on PR #1674:
URL: https://github.com/apache/samza/pull/1674#issuecomment-1603366596

   > I don't think there is any problem with the existing code. For active containers, the expected behavior is to keep the `KafkaCheckpointManager` started until the entire container is shutdown which would trigger stop.
   
   This PR doesn't change when `KafkaCheckpointManager` is shutdown. The PR is trying to address that when `KafkaCheckpointManager` is shutdown, `SystemConsumer` is left open.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ryucc commented on pull request #1674: Close SystemConsumer properly

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on PR #1674:
URL: https://github.com/apache/samza/pull/1674#issuecomment-1603427889

   The idempotent protection line for KafkaSystemConsumer is never triggered in my logs, while there are 168 successful starts and 3 stops.
   
   ```
   $grep "Attempting to start the consumer for the second" xxxxx.log.2023-06-21 |wc -l
          0
   $grep ": Consumer started" seas-cloud-consumer.log.2023-06-21|wc -l
        168
   $grep "Stopping Samza kafkaConsumer" seas-cloud-consumer.log.2023-06-21|wc -l
          3
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org