You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/15 10:00:38 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

vamossagar12 opened a new pull request #9756:
URL: https://github.com/apache/kafka/pull/9756


   This PR aims to add leader fsync occur with size based heuristic. Meaning, if we accumulate a configurable N bytes, then we should not wait for linger expiration and should just fsync immediately.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r555431717



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       Yeah, I think there should be a configuration here; I am just a little unsure how it should affect the batch size. In the producer, for example, we just expose the batch size directly. We could do the same here, but I'm slightly inclined to keep the batch size decoupled from this configuration. Say for example, that we have a hard-coded 1MB max batch size. Then we could say that the effective batch size is equal to `min(1MB, maxUnflushedSize)`. A user _could_ then define `maxUnflushedSize` to be larger than 1MB and we would not have to deal with the implications of huge batches. Does that make sense?
   
   @jsancio Any thoughts on this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r556429212



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       @hachikuji , @jsancio I agree to the points. I am slightly confused on the advantage of letting the config to be set higher than maxBatchSize(1MB) and then limiting the effective batch size to the minimum of `maxUnflushedBytes` and `maxBatchSize`. Won't it lead to confusion in terms of usage?
   Infact, I have followed a similar approach in the PR in a different way. What I have done is restricting the value of this new config within a range of 0(which is wrong, need to change) to maxBatchSize - 1:
   
   
   ```            .define(QUORUM_FLUSH_MIN_SIZE_BYTES_CONFIG,
                   ConfigDef.Type.INT,
                   1024,
                   between(0, (1024 * 1024) - 1),
                   ConfigDef.Importance.MEDIUM,
                   QUORUM_FLUSH_MIN_SIZE_BYTES_DOC);
   ```
   
   I feel this way, what the config provides and what actually happens remains the same and effectively, this will be equivalent to `min(maxBatchsize, maxUnflushedBytes)`.
   
   Also @hachikuji , wanted to understand how the newly proposed config; `quorum.append.max.linger.ms` would interplay with the existing `quorum.append.linger.ms` config.  As per my understanding, the moment `quorum.append.linger.ms` is crossed, the flush would start. This happens even in this new implementation irrespective of hitting `maxUnflushedBytes` or not. Are you suggesting that we still hold onto writes until we hit the `quorum.append.max.linger.ms`  thereby overriding `quorum.append.linger.ms`? 
   
   Considering just these 2 configs, I am confused of how it will work but if we add the `maxUnflushedBytes`, then probably we will have to write some logic to hold onto writes between `quorum.append.linger.ms` and `quorum.append.max.linger.ms`. Maybe, we give further time between this time range to let the writes be accumulated. The moment we hit either of the 2 i.e `maxUnflushedBytes` or `quorum.append.max.linger.ms`, we flush the writes. This gives me the impression that we are giving more preference to `maxUnflushedBytes`. 
   
   Let me know your thoughts plz.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r552267020



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -59,6 +60,11 @@
     private static final String QUORUM_LINGER_MS_DOC = "The duration in milliseconds that the leader will " +
         "wait for writes to accumulate before flushing them to disk.";
 
+    public static final String QUORUM_FLUSH_MIN_SIZE_BYTES_CONFIG = QUORUM_PREFIX + "flush.minSize.bytes";

Review comment:
       If we decide to keep this, how about we call it `quorum.append.max.unflushed.bytes` or something like that? Basically it's the maximum number of bytes that the raft implementation is allowed to accumulate before forcing an fsync.

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       The main thing I'm wondering is if it makes sense to merge `minFlushSize` and `maxBatchSize` into a single configuration. I think that would simplify the implementation a bit since we could then check if `completed` is not empty to know whether to drain. It is clear that `minFlushSize` should be at least as large as `maxBatchSize`, but I'm not sure how useful it is for it to be larger. I was planning to keep `maxBatchSize` a static configuration, so I guess if we want `minFlushSize` to be configurable, then we need to allow for `minFlushSize` to be larger. What do you think?

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -211,6 +214,33 @@ public long timeUntilDrain(long currentTimeMs) {
         }
     }
 
+    /**
+     * Check if the current batch size has exceeded the min flush size.
+     *
+     * Note that this method works on best effort i.e it tries to acquire the append lock and if it can't
+     * then instead of blocking, it returns false.
+     *
+     * This means that if the thread responsible for appending is holding the lock and the linger time hasn't expired
+     * yet, then even though the batch size exceeds the min flush size, the records won't be drained as the lock
+     * couldn't be acquired. This also means that in subsequent run(s), this method should be able to acquire the lock
+     * and return true in the event the linger time hasn't expired yet.
+     *
+     * @return true if the append lock could be acquired and the accumulated bytes are greater than configured min flush
+     * bytes size, false otherwise.
+     */
+    public boolean batchSizeExceedsMinFlushSize() {

Review comment:
       I think we need to look at completed batches in here as well, right? To simplify the implementation, I think we could do something like the following:
   
   1. Inside `append`, while we are already holding the lock, we can check if the accumulated bytes (including `completed` and `currentBatch`) have reached `minFlushSize`. If so, we can call `completeCurrentBatch` to ensure that `completed` holds all the data that needs to be drained.
   2. Inside `timeUntilDrain`, if the linger timer hasn't been reached, we can iterate `completed` and check if there are enough bytes to flush. Then we don't need to acquire the lock unless we need to drain.
   
   Would that work?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r555466107



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       Batching has a lot of implications in the system. The flush behavior is just one. It also impacts the segment size and the ability to do down-conversion efficiently since we have to read the whole batch into memory. Basically any time we need to do anything in memory with the batches, we have to allocate at least enough memory to hold the largest batch. The controller is designed to write small messages, so I do not think 1MB (say) would be much of a constraint. For example, we have avoided storing assignment state in a single message as we did with Zookeeper. We can reconsider it if ever needed, but I'd like to keep batches relatively small if possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r560627883



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       > I am slightly confused on the advantage of letting the config to be set higher than maxBatchSize(1MB) and then limiting the effective batch size to the minimum of maxUnflushedBytes and maxBatchSize. Won't it lead to confusion in terms of usage?
   
   Not sure if this is the cause of the confusion, but just to be clear, what I'm referring to as the batch size is just a limit to the size of a `DefaultRecordBatch`. Records in Kafka are grouped together into batches with a schema roughly like this:
   ```
   Batch => Size Offset Crc ... [Record]
   ```
   It is useful to keep batches from getting too large because we don't have a way to index into a batch without storing the whole decompressed contents in memory. 
   
   However, we can let `maxUnflushedBytes` be as large as we want. That just means that we will collect multiple batches (i.e. multiple `CompletedBatch` instances) as the code already does. So if `maxUnflushedBytes` is set to 4MB, then we will end up collecting 4 batches before we flush to disk. We could also require that `maxUnflushedBytes` be less than or equal to our desired max batch size, but if it's a similar level of effort to support the more general config, then that seems better.
   
   > Also @hachikuji , wanted to understand how the newly proposed config; quorum.append.max.linger.ms would interplay with the existing quorum.append.linger.ms config. As per my understanding, the moment quorum.append.linger.ms is crossed, the flush would start. This happens even in this new implementation irrespective of hitting maxUnflushedBytes or not. Are you suggesting that we still hold onto writes until we hit the quorum.append.max.linger.ms thereby overriding quorum.append.linger.ms?
   
   My expectation is that we flush after either of these are reached. So if the linger time is hit first, then we take whatever unflushed bytes we have even if they are smaller than `maxUnflushedBytes`. On the other hand, if `maxUnflushedBytes` is reached, then we ignore linger. I think this is consistent with the implementation I suggested here: https://github.com/apache/kafka/pull/9756#discussion_r552268919. Basically we override `timeUntilDrain` so that it returns 0 when `maxUnflushedBytes` is reached.
   
   Hope that helps!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r560627883



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       > I am slightly confused on the advantage of letting the config to be set higher than maxBatchSize(1MB) and then limiting the effective batch size to the minimum of maxUnflushedBytes and maxBatchSize. Won't it lead to confusion in terms of usage?
   
   Not sure if this is the cause of the confusion, but just to be clear, what I'm referring to as the batch size is just a limit to the size of a `DefaultRecordBatch`. Records in Kafka are grouped together into batches with a schema roughly like this:
   ```
   Batch => Size Offset Crc ... [Record]
   ```
   It is useful to keep batches from getting too large because we don't have a way to index into a batch without storing the whole decompressed contents in memory. 
   
   However, we can let `maxUnflushedBytes` be as large as we want. That just means that we will collect multiple batches (i.e. multiple `CompletedBatch` instances) as the code already does. So if `maxUnflushedBytes` is set to 4MB, then we will end up collecting 4x1MB batches before we flush to disk rather than collecting one big 1x4MB batch. 
   
   We could also require that `maxUnflushedBytes` be less than or equal to our desired max batch size, but if it's a similar level of effort to support the more general config, then that seems better.
   
   > Also @hachikuji , wanted to understand how the newly proposed config; quorum.append.max.linger.ms would interplay with the existing quorum.append.linger.ms config. As per my understanding, the moment quorum.append.linger.ms is crossed, the flush would start. This happens even in this new implementation irrespective of hitting maxUnflushedBytes or not. Are you suggesting that we still hold onto writes until we hit the quorum.append.max.linger.ms thereby overriding quorum.append.linger.ms?
   
   My expectation is that we flush after either of these are reached. So if the linger time is hit first, then we take whatever unflushed bytes we have even if they are smaller than `maxUnflushedBytes`. On the other hand, if `maxUnflushedBytes` is reached, then we ignore linger. I think this is consistent with the implementation I suggested here: https://github.com/apache/kafka/pull/9756#discussion_r552268919. Basically we override `timeUntilDrain` so that it returns 0 when `maxUnflushedBytes` is reached.
   
   Hope that helps!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593032627



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";
     public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+    public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
       I see what you are trying to say. Well, the premise of this ticket originally was to trigger fsyncs happen the moment a configured amount of bytes have been accumulated. Here is the original description for Jason in the ticket:
   
   > In KAFKA-10601, we implemented linger semantics similar to the producer to let the leader accumulate a batch of writes before fsyncing them to disk. Currently the fsync is only based on the linger time, but it would be helpful to make it size-based as well. In other words, if we accumulate a configurable N bytes, then we should not wait for linger expiration and should just fsync immediately.
   
   But as you pointed out, it is also due to the fact that in the current implementation batch append and fsync goes hand in hand. 
   
   With the future implementation on deferring fsync, this might just affect the batch appends and considering that in mind, imo it makes sense to rename it to `append.linger.bytes` . It also matches with `append.linger.ms` .
   
   BTW, on the fsync deferral track. i had created a draft PR where i have outlined my approach: https://github.com/apache/kafka/pull/10278
   
   Request you or Jason to review this..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r555462779



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       > Say for example, that we have a hard-coded 1MB max batch size. Then we could say that the effective batch size is equal to min(1MB, maxUnflushedSize)
   
   Hmm. I am sure on the right behaviour here. You probably already discussed this and I missed it but can you elaborate on why we want to have a minimum for the maximum batch size?
   
   Are you thinking that we need to have a maximum batch size that is at least as large as the smallest record that the Kafka Controller can generate?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r555470725



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -59,6 +60,11 @@
     private static final String QUORUM_LINGER_MS_DOC = "The duration in milliseconds that the leader will " +
         "wait for writes to accumulate before flushing them to disk.";
 
+    public static final String QUORUM_FLUSH_MIN_SIZE_BYTES_CONFIG = QUORUM_PREFIX + "flush.minSize.bytes";

Review comment:
       I like the suggestion of using `quorum.append.max.unflushed.bytes`. I would mention in the documentation that this is a best effort. Based on how `KafakRaftClient` calls drain, it is possible for accumulator have more unflushed bytes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r553472551



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       The way I thought about it is to still keep it configurable and keep it within the bounds of maxBatchSize. With that, the users would have the option of being able to flush more frequently based on size if it suits them and not wait for time bound flushes. We should still ensure a lower bound for this config otherwise, the fsyncs can become too frequent which can have adverse effects. I agree, setting it higher than `maxBatchSize` also might not be too useful as that might potentially lead to delayed fsyncs if in case the linger time has also been set to a higher value.
   
   Having said that, IMO it might be useful to keep minFlushSize configurable which gives the users more knobs to control the behaviour based upon their needs(even though more knobs sometimes can mean more confusion). Do you think it makes sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r553481214



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -211,6 +214,33 @@ public long timeUntilDrain(long currentTimeMs) {
         }
     }
 
+    /**
+     * Check if the current batch size has exceeded the min flush size.
+     *
+     * Note that this method works on best effort i.e it tries to acquire the append lock and if it can't
+     * then instead of blocking, it returns false.
+     *
+     * This means that if the thread responsible for appending is holding the lock and the linger time hasn't expired
+     * yet, then even though the batch size exceeds the min flush size, the records won't be drained as the lock
+     * couldn't be acquired. This also means that in subsequent run(s), this method should be able to acquire the lock
+     * and return true in the event the linger time hasn't expired yet.
+     *
+     * @return true if the append lock could be acquired and the accumulated bytes are greater than configured min flush
+     * bytes size, false otherwise.
+     */
+    public boolean batchSizeExceedsMinFlushSize() {

Review comment:
       Yeah completed batches should be considered. Also, with what you have suggested, it removes the need to hold the lock again in batchSizeExceedsMinFlushSize. Basically, we won't even need this method anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#issuecomment-768979520


   @hachikuji  @jsancio  i have made the changes. Plz review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r566237380



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -474,6 +474,7 @@ public void testAccumulatorClearedAfterBecomingFollower() throws Exception {
             .thenReturn(buffer);
 
         RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withMaxUnflushedBytes(KafkaRaftClient.MAX_BATCH_SIZE)

Review comment:
       This comment applies to a few places in this file.
   
   Is there a reason why we override this value?

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
         assertEquals(3L, context.log.endOffset().offset);
     }
 
+    @Test
+    public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception {
+        // This test verifies that the client will get woken up immediately
+        // if the linger timeout has expired during an append

Review comment:
       > "if the linger timeout..."
   
   Did you mean minimum flush bytes?

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1078,6 +1078,7 @@ private static FetchResponseData snapshotFetchResponse(
     private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) {
         return new SnapshotWriter<>(
             snapshot,
+            1024,

Review comment:
       In the snapshot tests, we can set the `maxUnflushedBytes` to the same value as `maxBatchSize`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";
     public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+    public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes";
+    public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_DOC = "The maximum number of bytes that the leader " +
+        "will allow to be accumulated before flushing them to disk ";

Review comment:
       ```suggestion
       public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_DOC = "The maximum number of bytes that the leader " +
           "will allow to be accumulated before appending to the topic partition.";
   ```

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
         assertEquals(3L, context.log.endOffset().offset);
     }
 
+    @Test
+    public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception {
+        // This test verifies that the client will get woken up immediately
+        // if the linger timeout has expired during an append
+
+        int localId = 0;
+        int otherNodeId = 1;
+        int minFlushSizeInBytes = 120;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withMaxUnflushedBytes(minFlushSizeInBytes)

Review comment:
       The local variable is called `minFlushSizeInBytes` yet the `KafkaRaftClient` uses `maxUnflushBytes`. Did we agree to use `minUnflushedBytes`?

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";

Review comment:
       Outside the scope of this PR but how about changing this description to:
   > The duration in milliseconds that the leader will wait for writes to accumulate before appending to the topic partition.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";
     public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+    public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
       @vamossagar12 and @hachikuji  How about we call this `append.linger.bytes`?
   
   Excuse the back and forth on this name put let me explain.
   
   In the current implementation this does determine when the bytes will be flush because the `KafkaRaftClient` flushes the `Log` every time it drains the `BatchAccumulator` and it appends to the log.
   
   https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1871-L1879
   
   In the past, we have talked about delaying the flush/fsync to the log. I believe that the invariant that we need to satisfied is that the leader cannot increase the high-watermark past the flushed offsets. Followers need to flush/fsync before sending a Fetch request since the leader assumes that the followers have safely replicated the offset in the Fetch request, 
   
   If in the future we implement this logic or something similar, I think the name `append.max.unflushed.bytes` would not be accurate since it contains the word "unflushed". And the leader may decide to have more than `append.max.unflushed.bytes` bytes unflushed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593019808



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
         assertEquals(3L, context.log.endOffset().offset);
     }
 
+    @Test
+    public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception {
+        // This test verifies that the client will get woken up immediately
+        // if the linger timeout has expired during an append
+
+        int localId = 0;
+        int otherNodeId = 1;
+        int minFlushSizeInBytes = 120;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withMaxUnflushedBytes(minFlushSizeInBytes)

Review comment:
       yeah that was an oversight as well. Changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r556429212



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       @hachikuji , @jsancio I agree to the points. I am slightly confused on the advantage of letting the config to be set higher than maxBatchSize(1MB) and then limiting the effective batch size to the minimum of `maxUnflushedBytes` and `maxBatchSize`. Won't it lead to confusion in terms of usage?
   Infact, I have followed a similar approach in the PR in a different way. What I have done is restricting the value of this new config within a range of 0(which is wrong, need to change) to maxBatchSize - 1:
   
   
   ```            .define(QUORUM_FLUSH_MIN_SIZE_BYTES_CONFIG,
                   ConfigDef.Type.INT,
                   1024,
                   between(0, (1024 * 1024) - 1),
                   ConfigDef.Importance.MEDIUM,
                   QUORUM_FLUSH_MIN_SIZE_BYTES_DOC);
   ```
   
   I feel this way, what the config provides and what actually happens remains the same and effectively, this will be equivalent to `min(maxBatchsize, maxUnflushedBytes)`.
   
   Let me know your thoughts plz.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593024705



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -474,6 +474,7 @@ public void testAccumulatorClearedAfterBecomingFollower() throws Exception {
             .thenReturn(buffer);
 
         RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withMaxUnflushedBytes(KafkaRaftClient.MAX_BATCH_SIZE)

Review comment:
       Yeah. The reason for that is that in this PR @hachikuji had suggested to change the logic for setting maxBatchSize in BatchAccumulator to the following way: 
   
   `this.maxBatchSize = Math.min(maxBatchSize, maxUnflushedBytes);`
   
   That is the reason I am setting some combinations of value to check if it behaves correctly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593033011



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";

Review comment:
       I updated the doc for the maxUnflushedBytes config.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#issuecomment-773204238


   @hachikuji did you get a chance to review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r560627883



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       > I am slightly confused on the advantage of letting the config to be set higher than maxBatchSize(1MB) and then limiting the effective batch size to the minimum of maxUnflushedBytes and maxBatchSize. Won't it lead to confusion in terms of usage?
   
   Not sure if this is the cause of the confusion, but just to be clear, what I'm referring to as the batch size is just a limit to the size of a `DefaultRecordBatch`. Records in Kafka are grouped together into batches with a schema roughly like this:
   ```
   Batch => Size Offset Crc ... [Record]
   ```
   It is useful to keep batches from getting too large because we don't have a way to index into a batch without storing the whole decompressed contents in memory. 
   
   However, we can let `maxUnflushedBytes` be as large as we want. That just means that we will collect multiple batches (i.e. multiple `CompletedBatch` instances) as the code already does. So if `maxUnflushedBytes` is set to 4MB, then we will end up collecting 4 batches before we flush to disk rather than collecting one big 4MB batch. 
   
   We could also require that `maxUnflushedBytes` be less than or equal to our desired max batch size, but if it's a similar level of effort to support the more general config, then that seems better.
   
   > Also @hachikuji , wanted to understand how the newly proposed config; quorum.append.max.linger.ms would interplay with the existing quorum.append.linger.ms config. As per my understanding, the moment quorum.append.linger.ms is crossed, the flush would start. This happens even in this new implementation irrespective of hitting maxUnflushedBytes or not. Are you suggesting that we still hold onto writes until we hit the quorum.append.max.linger.ms thereby overriding quorum.append.linger.ms?
   
   My expectation is that we flush after either of these are reached. So if the linger time is hit first, then we take whatever unflushed bytes we have even if they are smaller than `maxUnflushedBytes`. On the other hand, if `maxUnflushedBytes` is reached, then we ignore linger. I think this is consistent with the implementation I suggested here: https://github.com/apache/kafka/pull/9756#discussion_r552268919. Basically we override `timeUntilDrain` so that it returns 0 when `maxUnflushedBytes` is reached.
   
   Hope that helps!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r555460647



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       I think it is important to agree on the current (`trunk`) behavior of `BatchAccumulator`. It is my understanding that the `BatchAccumulator` creates an in memory batch when `append` has been called enough times with enough data such that the sum of records sizes is greater than `maxBatchSize` (https://github.com/apache/kafka/pull/9756/files#diff-b7a2129b03764fbafab69c81e985d8ac6006d55f95307f0e21c70fb4750f61b5R145-R148) **OR** the first un-drained `append` is `lingerMs` old (https://github.com/apache/kafka/pull/9756/files#diff-b7a2129b03764fbafab69c81e985d8ac6006d55f95307f0e21c70fb4750f61b5R278).
   
   I see two issues with the current implementation (`trunk`):
   
   1. The draining thread will not call `drain`, even if `completed` is not empty, until `lingerMs` has expired (https://github.com/apache/kafka/pull/9756/files#diff-b7a2129b03764fbafab69c81e985d8ac6006d55f95307f0e21c70fb4750f61b5R254-R255).
   
   Can this issue be address by checking if `completed` is non empty in `timeUntilDrain`?
   
   2. The KafkaRaftClient doesn't expose a way to configure the `BatchAccumulator`'s `maxBatchSize`.
   
   Can this issue be address by adding this configuration to `RaftConfig` and the second constructor to `KafkaRaftClient` (https://github.com/apache/kafka/pull/9756/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R179)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#issuecomment-786004816


   @hachikuji , there were some merge conflicts which needed to be resolved. Can you plz review the PR whenever you get the chance ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r606198207



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";
     public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+    public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
       hey @jsancio .. sorry to bother again, but could you plz review this whenever you get the chance?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r616018340



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";
     public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+    public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
       hi @jsancio / @hachikuji  could you plz review the PR whenever you get the chance?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r560627883



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       > I am slightly confused on the advantage of letting the config to be set higher than maxBatchSize(1MB) and then limiting the effective batch size to the minimum of maxUnflushedBytes and maxBatchSize. Won't it lead to confusion in terms of usage?
   
   Not sure if this is the cause of the confusion, but just to be clear, what I'm referring to as the batch size is just a limit to the size of a `DefaultRecordBatch`. Records in Kafka are grouped together into batches with a schema roughly like this:
   ```
   Batch => Size Offset Crc ... [Record]
   ```
   It is useful to keep batches from getting too large because we don't have a way to index into a batch without storing the whole decompressed contents in memory. 
   
   However, we can let `maxUnflushedBytes` be as large as we want. That just means that we will collect multiple batches (i.e. multiple `CompletedBatch` instances) as the code already does. So if `maxUnflushedBytes` is set to 4MB, then we will end up collecting 4x1MB batches before we flush to disk rather than collecting 1x4MB batch. 
   
   We could also require that `maxUnflushedBytes` be less than or equal to our desired max batch size, but if it's a similar level of effort to support the more general config, then that seems better.
   
   > Also @hachikuji , wanted to understand how the newly proposed config; quorum.append.max.linger.ms would interplay with the existing quorum.append.linger.ms config. As per my understanding, the moment quorum.append.linger.ms is crossed, the flush would start. This happens even in this new implementation irrespective of hitting maxUnflushedBytes or not. Are you suggesting that we still hold onto writes until we hit the quorum.append.max.linger.ms thereby overriding quorum.append.linger.ms?
   
   My expectation is that we flush after either of these are reached. So if the linger time is hit first, then we take whatever unflushed bytes we have even if they are smaller than `maxUnflushedBytes`. On the other hand, if `maxUnflushedBytes` is reached, then we ignore linger. I think this is consistent with the implementation I suggested here: https://github.com/apache/kafka/pull/9756#discussion_r552268919. Basically we override `timeUntilDrain` so that it returns 0 when `maxUnflushedBytes` is reached.
   
   Hope that helps!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r560627883



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       > I am slightly confused on the advantage of letting the config to be set higher than maxBatchSize(1MB) and then limiting the effective batch size to the minimum of maxUnflushedBytes and maxBatchSize. Won't it lead to confusion in terms of usage?
   
   Not sure if this is the cause of the confusion, but just to be clear, what I'm referring to as the batch size is just a limit to the size of a `DefaultRecordBatch`. Records in Kafka are grouped together into batches with a schema roughly like this:
   ```
   Batch => Size Offset Crc ... [Record]
   ```
   It is useful to keep batches from getting too large because we don't have a way to index into a batch without storing the whole decompressed contents in memory. 
   
   However, we can let `maxUnflushedBytes` be as large as we want. That just means that we will collect multiple batches (i.e. multiple `CompletedBatch` instances) as the code already does. So if `maxUnflushedBytes` is set to 4MB, then we will end up collecting 4x1MB batches before we flush to disk rather than collecting one big 4MB batch. 
   
   We could also require that `maxUnflushedBytes` be less than or equal to our desired max batch size, but if it's a similar level of effort to support the more general config, then that seems better.
   
   > Also @hachikuji , wanted to understand how the newly proposed config; quorum.append.max.linger.ms would interplay with the existing quorum.append.linger.ms config. As per my understanding, the moment quorum.append.linger.ms is crossed, the flush would start. This happens even in this new implementation irrespective of hitting maxUnflushedBytes or not. Are you suggesting that we still hold onto writes until we hit the quorum.append.max.linger.ms thereby overriding quorum.append.linger.ms?
   
   My expectation is that we flush after either of these are reached. So if the linger time is hit first, then we take whatever unflushed bytes we have even if they are smaller than `maxUnflushedBytes`. On the other hand, if `maxUnflushedBytes` is reached, then we ignore linger. I think this is consistent with the implementation I suggested here: https://github.com/apache/kafka/pull/9756#discussion_r552268919. Basically we override `timeUntilDrain` so that it returns 0 when `maxUnflushedBytes` is reached.
   
   Hope that helps!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#issuecomment-850069063


   @jsancio could you plz review this PR whenever you get the chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593019531



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1078,6 +1078,7 @@ private static FetchResponseData snapshotFetchResponse(
     private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) {
         return new SnapshotWriter<>(
             snapshot,
+            1024,

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#issuecomment-773479357


   @vamossagar12 Thanks, I missed the update. I will review today.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r555472538



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -211,6 +214,33 @@ public long timeUntilDrain(long currentTimeMs) {
         }
     }
 
+    /**
+     * Check if the current batch size has exceeded the min flush size.
+     *
+     * Note that this method works on best effort i.e it tries to acquire the append lock and if it can't
+     * then instead of blocking, it returns false.
+     *
+     * This means that if the thread responsible for appending is holding the lock and the linger time hasn't expired
+     * yet, then even though the batch size exceeds the min flush size, the records won't be drained as the lock
+     * couldn't be acquired. This also means that in subsequent run(s), this method should be able to acquire the lock
+     * and return true in the event the linger time hasn't expired yet.
+     *
+     * @return true if the append lock could be acquired and the accumulated bytes are greater than configured min flush
+     * bytes size, false otherwise.
+     */
+    public boolean batchSizeExceedsMinFlushSize() {

Review comment:
       > 1. Inside append, while we are already holding the lock, we can check if the accumulated bytes (including completed and currentBatch) have reached minFlushSize. If so, we can call completeCurrentBatch to ensure that completed holds all the data that needs to be drained.
   > 2. Inside timeUntilDrain, if the linger timer hasn't been reached, we can iterate completed and check if there are enough bytes to flush. Then we don't need to acquire the lock unless we need to drain.
   
   I like this suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#issuecomment-773204238


   @hachikuji did you get a chance to review this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#issuecomment-749304889


   @hachikuji , did you get a chance to review this one?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r556118897



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       It sounds like we are basically on the same page here. I think we agree that there should be a configuration. As suggested in the other comment, I like `quorum.append.max.unflushed.bytes` to go along with `quorum.append.linger.ms`. Perhaps we could even use `quorum.append.max.linger.ms` to make it clear that it is the maximum amount of time that the server will hold onto a write before flushing.
   
   In regard to the question of setting the batch size, I suggest we stick with the hard-coded value for now and set the effective value to be the minimum of the hard-coded value and `max.unflushed.bytes`. We can file a separate jira to consider whether this should be configurable separately or whether we can allow `max.unflushed.bytes` to specify the batch size precisely.
   
   @vamossagar12 Does that sound reasonable to you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#issuecomment-773479357


   @vamossagar12 Thanks, I missed the update. I will review today.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593019665



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
         assertEquals(3L, context.log.endOffset().offset);
     }
 
+    @Test
+    public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception {
+        // This test verifies that the client will get woken up immediately
+        // if the linger timeout has expired during an append

Review comment:
       yes.. changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r555466107



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
     private final Time time;
     private final SimpleTimer lingerTimer;
     private final int lingerMs;
+    private final int minFlushSize;

Review comment:
       Batching has a lot of implications in the system. The flush behavior is just one. It also impacts the segment size and the ability to do down-conversion efficiently since we have to read the whole batch into memory. The controller is designed to write small messages, so I do not think 1MB (say) would be much of a constraint. For example, we have avoided storing assignment state in a single message as we did with Zookeeper. We can reconsider it if ever needed, but I'd like to keep batches relatively small if possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org