You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/03 22:48:39 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

guozhangwang commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r742366571



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1030,8 +1038,11 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f
                     streamThread.shutdown();
                     threads.remove(streamThread);
                     final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
-                    log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
+                    final long maxBufferSizePerThread = getBufferSizePerThread(getNumLiveStreamThreads());
+                    log.info("Resizing thread cache again since new thread can not be started, final cache size per thread is {}", cacheSizePerThread);

Review comment:
       nit: ditto here. See above for the consolidated log line. Here we can emphasize it is "Terminating newly added threads".

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -84,6 +84,9 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String TOTAL_BYTES = "total-bytes";
+    private static final String TOTAL_BYTES_DESCRIPTION = "The total number of bytes accumulated by this task";

Review comment:
       We should be more specific about the description here: the total number of bytes accumulated in this task's input buffer.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -1288,6 +1326,10 @@ int currentNumIterations() {
         return numIterations;
     }
 
+    long bufferSize() {

Review comment:
       This function seems not used.
   
   BTW if we do not maintain the local `bufferSize` then we would not need it anyways :)
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -266,6 +270,10 @@ StampedRecord nextRecord(final RecordInfo info, final long wallClockTime) {
             if (record != null) {
                 --totalBuffered;
 
+                totalBytesBuffered -= (record.key() != null ? record.serializedKeySize() : 0) +

Review comment:
       How about adding a `sizeInBytes` function to `StampedRecord` class, to avoid duplicating the calculation each time?
   
   Also we do not need to check for `key/value == null` or not, since in that case the `ConsumerRecord#serializedKeySize/ValueSize` would be 0.
   
   Also note that the total bytes taken here is more than just the key/value since we have other fields like theaders, timestamps etc, but since it's hard to capture exactly how many bytes are taken in JVM to store a ConsumerRecord<byte[], byte[]> I think getting just getting the sum of key/value/timestamp/offset/topic/partition/headers is fine.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1013,12 +1017,16 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f
                 final int threadIdx = getNextThreadIndex();
                 final int numLiveThreads = getNumLiveStreamThreads();
                 final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1);
+                final long maxBufferSizePerThread = getBufferSizePerThread(numLiveThreads + 1);
                 log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
                          threadIdx, numLiveThreads + 1, cacheSizePerThread);
                 resizeThreadCache(cacheSizePerThread);
+                log.info("Adding StreamThread-{}, there are now {} threads with a buffer size {} and cache size {} per thread.",

Review comment:
       nit: Could we merge these two info lines into a single one? It seems a bit redundant to log twice here. Also this new log line seems wrong since it has four parameters but only three values provided.
   
   E.g.
   
   ```
   Adding StreamThread-{}, the current total number of thread is {}, each thread now has a buffer size {} and cache size {}
   ```
   
   And
   
   ```
   Terminating StreamThread-{}, the current total number of thread is {}, each thread now has a buffer size {} and cache size {}
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1221,13 +1236,27 @@ private long getCacheSizePerThread(final int numStreamThreads) {
         return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
     }
 
+    private long getBufferSizePerThread(final int numStreamThreads) {
+        if (numStreamThreads == 0) {
+            return inputBufferMaxBytes;
+        }
+        return inputBufferMaxBytes / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0));
+    }
+
     private void resizeThreadCache(final long cacheSizePerThread) {
         processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
         if (globalStreamThread != null) {
             globalStreamThread.resize(cacheSizePerThread);
         }
     }
 
+    private void resizeMaxBufferSize(final long maxBufferSize) {

Review comment:
       +1 here as well. I think we would always resize both buffer and state cache at the same time moving forward.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##########
@@ -354,10 +370,22 @@ int numBuffered(final TopicPartition partition) {
         return recordQueue.size();
     }
 
+    Set<TopicPartition> getNonEmptyTopicPartitions() {
+        final Set<TopicPartition> nonEmptyTopicPartitions = new HashSet<>();
+        for (final RecordQueue recordQueue : nonEmptyQueuesByTime) {
+            nonEmptyTopicPartitions.add(recordQueue.partition());
+        }
+        return nonEmptyTopicPartitions;
+    }
+
     int numBuffered() {
         return totalBuffered;
     }
 
+    long totalBytesBuffered() {

Review comment:
       This function seems only used for testing? If yes, please move it to the bottom of the class and add a comment that "below are for testing only".

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -601,6 +608,10 @@ boolean runLoop() {
                 if (size != -1L) {
                     cacheResizer.accept(size);
                 }
+                final long bufferBytesSize = maxBufferResizeSize.getAndSet(-1L);
+                if (size != -1) {

Review comment:
       This does not look right to me: why we use the size value read from `cacheResizeSize` to assign to `maxBufferSizeBytes`? They should be totally orthogonal.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {
+
+        int totalProcessed;
+
+        long totalBytesConsumed;
+
+        RecordsProcessedMetadata(final int totalProcessed, final long totalBytesConsumed) {
+            this.totalProcessed = totalProcessed;
+            this.totalBytesConsumed = totalBytesConsumed;
+        }
+    }
+
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process(final int maxNumRecords, final Time time) {
+    RecordsProcessedMetadata process(final int maxNumRecords, final Time time) {
         int totalProcessed = 0;
+        long totalBytesConsumed = 0L;
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
             int processed = 0;
+            long bytesConsumed = 0L;
+            task.setBytesConsumed(0L);

Review comment:
       Not sure I understand this logic here: it seems we only call `setBytesConsumed` once with 0 here and there's no other callers?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {

Review comment:
       See my other comment: I think we can avoid propagating both processed-records and processed-bytes from the `process` call.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -128,6 +131,22 @@ public static Sensor activeBufferedRecordsSensor(final String threadId,
         return sensor;
     }
 
+    public static Sensor totalBytesSensor(final String threadId,

Review comment:
       While reviewing the PR, I feel the name `total-bytes` under
   
   ```
   type = stream-task-metrics
   thread-id = [thread ID]
   task-id = [task ID]
   ```
   
   is a bit too vague, what about renaming it to `total-input-buffer-bytes`. WDYT @vamossagar12 @ableegoldman ? If we agree here we'd need to update the KIP as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {
+
+        int totalProcessed;
+
+        long totalBytesConsumed;
+
+        RecordsProcessedMetadata(final int totalProcessed, final long totalBytesConsumed) {
+            this.totalProcessed = totalProcessed;
+            this.totalBytesConsumed = totalBytesConsumed;
+        }
+    }
+
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process(final int maxNumRecords, final Time time) {
+    RecordsProcessedMetadata process(final int maxNumRecords, final Time time) {
         int totalProcessed = 0;
+        long totalBytesConsumed = 0L;

Review comment:
       I think we can simply the logic and do not need to keep track of "consumed bytes" within a task here, see my other comment.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
##########
@@ -49,6 +49,14 @@ public Headers headers() {
         return value.headers();
     }
 
+    public int serializedKeySize() {

Review comment:
       Please see my other comment above: how about just have a `sizeInBytes` function which takes key/value/timestamp/offset/topic/partition/headers into account?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1112,6 +1124,9 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f
                         final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
                         log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread);
                         resizeThreadCache(cacheSizePerThread);
+                        final long maxBufferSizePerThread = getBufferSizePerThread(getNumLiveStreamThreads());
+                        log.info("Resizing max buffer size due to thread removal, new buffer size per thread is {}", maxBufferSizePerThread);

Review comment:
       Ditto here as wel.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1269,20 +1280,36 @@ void handleTopologyUpdates() {
         }
     }
 
+    static class RecordsProcessedMetadata {
+
+        int totalProcessed;
+
+        long totalBytesConsumed;
+
+        RecordsProcessedMetadata(final int totalProcessed, final long totalBytesConsumed) {
+            this.totalProcessed = totalProcessed;
+            this.totalBytesConsumed = totalBytesConsumed;
+        }
+    }
+
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    int process(final int maxNumRecords, final Time time) {
+    RecordsProcessedMetadata process(final int maxNumRecords, final Time time) {
         int totalProcessed = 0;
+        long totalBytesConsumed = 0L;
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
             int processed = 0;
+            long bytesConsumed = 0L;
+            task.setBytesConsumed(0L);

Review comment:
       Also, even with the correct logic, I'm wondering if we can just define it as a local variable within the `process` here instead of augmenting the `Task` interface?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -793,6 +809,10 @@ void runOnce() {
 
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
+                    if (bufferSize > maxBufferSizeBytes && bufferSize - processedData.totalBytesConsumed <= maxBufferSizeBytes) {

Review comment:
       I also feel this logic is a bit awkward, starting from the fact that we need to report how many bytes we've consumed from the process :) I think we can simply do the following:
   
   At the end of polling phase, and at the end the process loop (a.k.a. here), we loop over all the active tasks, and get their "input buffer size", which would delegate to each task's corresponding `PartitionGroup` and then `RecordQueue`. And then based on that we can decide whether to resume / pause accordingly. Then
   
   1) we do not need to maintain a local `bufferSize` at the task here, i.e. we always re-compute from the task's record queue, which is the source of truth.
   2) we do not need to maintain and propagate up the `consumed bytes` within each iteration here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org