You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/29 17:08:53 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

guozhangwang opened a new pull request, #12363:
URL: https://github.com/apache/kafka/pull/12363

   1. As titled, fix the right constructor param ordering.
   2. Also added a few more loglines.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r910233227


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -230,6 +230,7 @@ void updatePartitions(final Set<TopicPartition> newInputPartitions, final Functi
                 // if partition is removed should delete its queue
                 totalBuffered -= queueEntry.getValue().size();
                 totalBytesBuffered -= queueEntry.getValue().getTotalBytesBuffered();
+                totalInputBufferBytesSensor.record(totalBytesBuffered);

Review Comment:
   This is the actual fix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r912418706


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -93,8 +93,8 @@ RecordQueue queue() {
                    final Map<TopicPartition, RecordQueue> partitionQueues,
                    final Function<TopicPartition, OptionalLong> lagProvider,
                    final Sensor recordLatenessSensor,
+                   final Sensor totalInputBufferBytesSensor,
                    final Sensor enforcedProcessingSensor,
-                   final Sensor totalBytesSensor,

Review Comment:
   Hello @vamossagar12 could you file a JIRA/PR for the test coverage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
lihaosky commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r910333160


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -797,8 +798,10 @@ void runOnce() {
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
                     final long bufferSize = taskManager.getInputBufferSizeInBytes();
-                    if (bufferSize <= maxBufferSizeBytes.get()) {
-                        mainConsumer.resume(mainConsumer.paused());
+                    if (bufferSize <= maxBufferSizeBytes.get() && !pausedPartitions.isEmpty()) {
+                        log.info("Buffered records size {} bytes falls below {}. Resuming all the paused partitions {} in the consumer",
+                            bufferSize, maxBufferSizeBytes.get(), pausedPartitions);
+                        mainConsumer.resume(pausedPartitions);

Review Comment:
   Should `pausedPartitions` be cleared somewhere? After this line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r910237100


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -93,8 +93,8 @@ RecordQueue queue() {
                    final Map<TopicPartition, RecordQueue> partitionQueues,
                    final Function<TopicPartition, OptionalLong> lagProvider,
                    final Sensor recordLatenessSensor,
+                   final Sensor totalInputBufferBytesSensor,
                    final Sensor enforcedProcessingSensor,
-                   final Sensor totalBytesSensor,

Review Comment:
   Thinking about this fix: would it be possible to avoid such errors in the future? Can we make `Sensor` more type save? -- For this issue, do we have missing tests (or it this too hard to test)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12363:
URL: https://github.com/apache/kafka/pull/12363#issuecomment-1172991178

   Thanks @lihaosky  @mjsax . I will reduce the scope of this PR to just sensors ordering bug fix and then merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r910244367


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -93,8 +93,8 @@ RecordQueue queue() {
                    final Map<TopicPartition, RecordQueue> partitionQueues,
                    final Function<TopicPartition, OptionalLong> lagProvider,
                    final Sensor recordLatenessSensor,
+                   final Sensor totalInputBufferBytesSensor,
                    final Sensor enforcedProcessingSensor,
-                   final Sensor totalBytesSensor,

Review Comment:
   There are a couple of unit tests: https://github.com/apache/kafka/blob/0924fd3f9f75c446310ed1e97b44bbc3f33c6c31/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java#L780 which try to check pause/resume. I think whats missing- and it's a miss on my part which I now realise- is an e2e IT. Something like push some data, check paused partitions and later on test their resumption.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r912501685


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -93,8 +93,8 @@ RecordQueue queue() {
                    final Map<TopicPartition, RecordQueue> partitionQueues,
                    final Function<TopicPartition, OptionalLong> lagProvider,
                    final Sensor recordLatenessSensor,
+                   final Sensor totalInputBufferBytesSensor,
                    final Sensor enforcedProcessingSensor,
-                   final Sensor totalBytesSensor,

Review Comment:
   @guozhangwang , done.. Please find the JIRA here: https://issues.apache.org/jira/browse/KAFKA-14040



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12363:
URL: https://github.com/apache/kafka/pull/12363#issuecomment-1173136467

   Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
lihaosky commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r911442135


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -797,8 +798,10 @@ void runOnce() {
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
                     final long bufferSize = taskManager.getInputBufferSizeInBytes();
-                    if (bufferSize <= maxBufferSizeBytes.get()) {
-                        mainConsumer.resume(mainConsumer.paused());
+                    if (bufferSize <= maxBufferSizeBytes.get() && !pausedPartitions.isEmpty()) {
+                        log.info("Buffered records size {} bytes falls below {}. Resuming all the paused partitions {} in the consumer",
+                            bufferSize, maxBufferSizeBytes.get(), pausedPartitions);
+                        mainConsumer.resume(pausedPartitions);

Review Comment:
   It was previously a member variable, I guess Guozhang changed it to local one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r911238798


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -797,8 +798,10 @@ void runOnce() {
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
                     final long bufferSize = taskManager.getInputBufferSizeInBytes();
-                    if (bufferSize <= maxBufferSizeBytes.get()) {
-                        mainConsumer.resume(mainConsumer.paused());
+                    if (bufferSize <= maxBufferSizeBytes.get() && !pausedPartitions.isEmpty()) {
+                        log.info("Buffered records size {} bytes falls below {}. Resuming all the paused partitions {} in the consumer",
+                            bufferSize, maxBufferSizeBytes.get(), pausedPartitions);
+                        mainConsumer.resume(pausedPartitions);

Review Comment:
   Not sure if I can follow? `pausedPartitions` is a local variable and it scopes end right after this block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r910244367


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -93,8 +93,8 @@ RecordQueue queue() {
                    final Map<TopicPartition, RecordQueue> partitionQueues,
                    final Function<TopicPartition, OptionalLong> lagProvider,
                    final Sensor recordLatenessSensor,
+                   final Sensor totalInputBufferBytesSensor,
                    final Sensor enforcedProcessingSensor,
-                   final Sensor totalBytesSensor,

Review Comment:
   There are a couple of unit tests: https://github.com/apache/kafka/blob/0924fd3f9f75c446310ed1e97b44bbc3f33c6c31/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java#L780 which try to check. I think whats missing- and it's a miss on my part which I now realise- is an e2e IT. Something like push some data, check paused partitions and later on test their resumption.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r910249613


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java:
##########
@@ -93,8 +93,8 @@ RecordQueue queue() {
                    final Map<TopicPartition, RecordQueue> partitionQueues,
                    final Function<TopicPartition, OptionalLong> lagProvider,
                    final Sensor recordLatenessSensor,
+                   final Sensor totalInputBufferBytesSensor,
                    final Sensor enforcedProcessingSensor,
-                   final Sensor totalBytesSensor,

Review Comment:
   Also, I think type safety might be a good idea to avoid this kind of an error. I mean it's an oversight on my part, but something like type-safety could have caught this error. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r910234367


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -969,7 +972,9 @@ long pollPhase() {
             final long bufferSize = taskManager.getInputBufferSizeInBytes();
             // Pausing partitions as the buffer size now exceeds max buffer size
             if (bufferSize > maxBufferSizeBytes.get()) {
-                log.info("Buffered records size {} bytes exceeds {}. Pausing the consumer", bufferSize, maxBufferSizeBytes.get());
+                final Set<TopicPartition> nonEmptyPartitions = taskManager.nonEmptyPartitions();

Review Comment:
   nit: we can reuse `nonEmptyPartitions` in the `pause` call below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on PR #12363:
URL: https://github.com/apache/kafka/pull/12363#issuecomment-1173137783

   Also, @guozhangwang / @mjsax I was thinking to enhance the behaviour of the new config so as to distribute the cache size in a more granular level(tasks -> partitions). This would make it similar ti the older deprecated config. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r910380094


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -797,8 +798,10 @@ void runOnce() {
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
                     final long bufferSize = taskManager.getInputBufferSizeInBytes();
-                    if (bufferSize <= maxBufferSizeBytes.get()) {
-                        mainConsumer.resume(mainConsumer.paused());
+                    if (bufferSize <= maxBufferSizeBytes.get() && !pausedPartitions.isEmpty()) {
+                        log.info("Buffered records size {} bytes falls below {}. Resuming all the paused partitions {} in the consumer",
+                            bufferSize, maxBufferSizeBytes.get(), pausedPartitions);
+                        mainConsumer.resume(pausedPartitions);

Review Comment:
   Ah yes!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
guozhangwang merged PR #12363:
URL: https://github.com/apache/kafka/pull/12363


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #12363: HOTFIX: Correct ordering of input buffer and enforced processing sensors

Posted by GitBox <gi...@apache.org>.
lihaosky commented on code in PR #12363:
URL: https://github.com/apache/kafka/pull/12363#discussion_r911442135


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -797,8 +798,10 @@ void runOnce() {
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
                     final long bufferSize = taskManager.getInputBufferSizeInBytes();
-                    if (bufferSize <= maxBufferSizeBytes.get()) {
-                        mainConsumer.resume(mainConsumer.paused());
+                    if (bufferSize <= maxBufferSizeBytes.get() && !pausedPartitions.isEmpty()) {
+                        log.info("Buffered records size {} bytes falls below {}. Resuming all the paused partitions {} in the consumer",
+                            bufferSize, maxBufferSizeBytes.get(), pausedPartitions);
+                        mainConsumer.resume(pausedPartitions);

Review Comment:
   It was previously a member variable, I guess Guozhang changed it to local one. 
   
   Edit: in https://github.com/apache/kafka/pull/12363/commits/bae2e2ac82da09e12576f43a45382009bddfd5aa



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org