You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/05 01:59:28 UTC

[GitHub] [kafka] jsancio opened a new pull request #10063: KAFKA-12258: Add support for splitting appending records

jsancio opened a new pull request #10063:
URL: https://github.com/apache/kafka/pull/10063


   1. Type `BatchAccumulator`. Add support for appending records into one or more batches.
   2. Type `RaftClient`. Rename `scheduleAppend` to `scheduleAtomicAppend`.
   3. Type `RaftClient`. Add a new method `scheduleAppend` which appends records to the log using as many batches as necessary.
   4. Increase the batch size from 1MB to 8MB.
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10063:
URL: https://github.com/apache/kafka/pull/10063


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r572957037



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,24 +80,69 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {

Review comment:
       Since these two `append` methods are mostly identical, can we use a single private method with `isAtomic` as an argument? Otherwise seems we will need to update both methods in most cases as we add features and fix bugs.

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,24 +80,69 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {
         if (epoch != this.epoch) {
-            // If the epoch does not match, then the state machine probably
-            // has not gotten the notification about the latest epoch change.
-            // In this case, ignore the append and return a large offset value
-            // which will never be committed.
+            return Long.MAX_VALUE;
+        }
+
+        ObjectSerializationCache serializationCache = new ObjectSerializationCache();
+
+        appendLock.lock();
+        try {
+            maybeCompleteDrain();
+
+            for (T record : records) {
+                BatchBuilder<T> batch = maybeAllocateBatch(

Review comment:
       Do we need to create a batch for each record? What is the overhead associated with a batch? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r574075250



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,24 +80,69 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {

Review comment:
       Yeah. I thought about doing this but I felt that the implementation was very subtle and hard to read. I use this strategy for `private Long append(int epoch, List<T> records, boolean isAtomic)` in `KafkaRaftClient` since the difference is obvious.
   
   In the `BatchAccumulator` for the "atomic" version we need to calculate the total size and allocate a buffer/batch if needed that fits that total size. For the non-atomic version we need to allocate a buffer/batch for every record if it doesn't fit the current batch.
   
   I'll create a commit for this so you can see the two side by side.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#issuecomment-779420827


   @mumrah Ready for review. Fixed the size calculation issues that I discovered when implementing this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578711429



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##########
@@ -307,4 +309,38 @@ public int writeRecord(
         recordOutput.writeVarint(0);
         return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
     }
+
+    private int batchHeaderSizeInBytes() {
+        return AbstractRecords.recordBatchHeaderSizeInBytes(
+            RecordBatch.MAGIC_VALUE_V2,
+            compressionType
+        );
+    }
+
+    private int bytesNeededForRecords(
+        Collection<T> records,
+        ObjectSerializationCache serializationCache
+    ) {
+        long expectedNextOffset = nextOffset;
+        int bytesNeeded = 0;
+        for (T record : records) {
+            if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) {
+                return Integer.MAX_VALUE;

Review comment:
       True. I think the cleanest solution is to throw in this case.
   
   I was thinking of returning `OptionalInt.of(Integer.MAX_VALUE)` and checking for it in the `BatchAccumulator`. With this solution we can't distinguish between the current batch plus the new records exceed this constraint. And the new records that are being added to an empty batch exceed this constraint.
   
   I think throwing an exception is fine since this is a fatal error. This mean that we have at least `Integer.MAX_VALUE` which would most like result in a OOM exception somewhere else in the system.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r577216920



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -77,6 +79,29 @@ default void handleResign() {}
      */
     void register(Listener<T> listener);
 
+    /**
+     * Append a list of records to the log. The write will be scheduled for some time
+     * in the future. There is no guarantee that appended records will be written to
+     * the log and eventually committed. While the order of the records is preserve, they can
+     * be appended to the log using one or more batches. This means that each record could
+     * be committed independently.

Review comment:
       It might already be clear enough given the previous sentence, but maybe we could emphasize that if any record becomes committed, then all records ordered before it are guaranteed to be committed as well.

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,67 +80,110 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {
+        return append(epoch, records, false);
+    }
+
+    /**
+     * Append a list of records into an atomic batch. We guarantee all records are included in the
+     * same underlying record batch so that either all of the records become committed or none of
+     * them do.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in a batch
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
+     *         batch size; if this exception is throw none of the elements in records were
+     *         committed
+     */
+    public Long appendAtomic(int epoch, List<T> records) {
+        return append(epoch, records, true);
+    }
+
+    private Long append(int epoch, List<T> records, boolean isAtomic) {
         if (epoch != this.epoch) {
-            // If the epoch does not match, then the state machine probably
-            // has not gotten the notification about the latest epoch change.
-            // In this case, ignore the append and return a large offset value
-            // which will never be committed.
             return Long.MAX_VALUE;
         }
 
         ObjectSerializationCache serializationCache = new ObjectSerializationCache();
-        int batchSize = 0;
-        for (T record : records) {
-            batchSize += serde.recordSize(record, serializationCache);
-        }
-
-        if (batchSize > maxBatchSize) {
-            throw new IllegalArgumentException("The total size of " + records + " is " + batchSize +
-                ", which exceeds the maximum allowed batch size of " + maxBatchSize);
-        }
+        int[] recordSizes = records
+            .stream()
+            .mapToInt(record -> serde.recordSize(record, serializationCache))
+            .toArray();
 
         appendLock.lock();
         try {
             maybeCompleteDrain();
 
-            BatchBuilder<T> batch = maybeAllocateBatch(batchSize);
-            if (batch == null) {
-                return null;
-            }
-
-            // Restart the linger timer if necessary
-            if (!lingerTimer.isRunning()) {
-                lingerTimer.reset(time.milliseconds() + lingerMs);
+            BatchBuilder<T> batch = null;
+            if (isAtomic) {
+                batch = maybeAllocateBatch(recordSizes);
             }
 
             for (T record : records) {
+                if (!isAtomic) {
+                    batch = maybeAllocateBatch(
+                        new int[] {serde.recordSize(record, serializationCache)}
+                    );
+                }
+
+                if (batch == null) {
+                    return null;
+                }
+
                 batch.appendRecord(record, serializationCache);
                 nextOffset += 1;
             }
 
+            maybeResetLinger();
+
             return nextOffset - 1;
         } finally {
             appendLock.unlock();
         }
     }
 
-    private BatchBuilder<T> maybeAllocateBatch(int batchSize) {
-        if (currentBatch == null) {
+    private void maybeResetLinger() {
+        if (!lingerTimer.isRunning()) {
+            lingerTimer.reset(time.milliseconds() + lingerMs);
+        }
+    }
+
+    private BatchBuilder<T> maybeAllocateBatch(int[] recordSizes) {
+        int bytesNeeded = BatchBuilder.batchSizeForRecordSizes(

Review comment:
       The call to `batchSizeForRecordSizes` delegates to `bytesNeededForRecordSizes`, which does a full pass over the record sizes and adds the size of the header. Then below `hasRoomFor` makes another call to `bytesNeededForRecordSizes`. Can we avoid the re-computation?
   
   In general, I wonder if we can be a little looser about the batch size limit and avoid some of the extra work. When compression is enabled, these are just estimates anyway. What if we treat the batch size limit as the maximum batch size before compression and ignore overhead. Basically we can set max.message.bytes to Int.MaxValue and just enforce the limit here on the pre-compression bytes.

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,67 +80,110 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.

Review comment:
       typo: `recors`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578082861



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -77,6 +79,29 @@ default void handleResign() {}
      */
     void register(Listener<T> listener);
 
+    /**
+     * Append a list of records to the log. The write will be scheduled for some time
+     * in the future. There is no guarantee that appended records will be written to
+     * the log and eventually committed. While the order of the records is preserve, they can
+     * be appended to the log using one or more batches. This means that each record could
+     * be committed independently.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r574215331



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,24 +80,69 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {

Review comment:
       @mumrah This one way to merge these two methods. Let me know which one you prefer.
   
   https://gist.github.com/jsancio/06bdffc9f25450127b1dc730ebbd55f7




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578870321



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##########
@@ -307,4 +309,38 @@ public int writeRecord(
         recordOutput.writeVarint(0);
         return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
     }
+
+    private int batchHeaderSizeInBytes() {
+        return AbstractRecords.recordBatchHeaderSizeInBytes(
+            RecordBatch.MAGIC_VALUE_V2,
+            compressionType
+        );
+    }
+
+    private int bytesNeededForRecords(
+        Collection<T> records,
+        ObjectSerializationCache serializationCache
+    ) {
+        long expectedNextOffset = nextOffset;
+        int bytesNeeded = 0;
+        for (T record : records) {
+            if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) {
+                return Integer.MAX_VALUE;

Review comment:
       Since we are handling this case by raising an exception, is it worth checking for overflow of `bytesNeeded` as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578886362



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##########
@@ -307,4 +309,38 @@ public int writeRecord(
         recordOutput.writeVarint(0);
         return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
     }
+
+    private int batchHeaderSizeInBytes() {
+        return AbstractRecords.recordBatchHeaderSizeInBytes(
+            RecordBatch.MAGIC_VALUE_V2,
+            compressionType
+        );
+    }
+
+    private int bytesNeededForRecords(
+        Collection<T> records,
+        ObjectSerializationCache serializationCache
+    ) {
+        long expectedNextOffset = nextOffset;
+        int bytesNeeded = 0;
+        for (T record : records) {
+            if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) {
+                return Integer.MAX_VALUE;

Review comment:
       Updated it to use `Math.addExact`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#issuecomment-781778478


   Running the following commands
   ```
   ./gradlew -version
   
   ------------------------------------------------------------
   Gradle 6.8.1
   ------------------------------------------------------------
   
   Build time:   2021-01-22 13:20:08 UTC
   Revision:     31f14a87d93945024ab7a78de84102a3400fa5b2
   
   Kotlin:       1.4.20
   Groovy:       2.5.12
   Ant:          Apache Ant(TM) version 1.10.9 compiled on September 27 2020
   JVM:          1.8.0_282 (Private Build 25.282-b08)
   OS:           Linux 5.8.0-7642-generic amd64
   
   ./gradlew -PscalaVersion=2.12 clean compileJava compileScala compileTestJava compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat --profile --no-daemon --continue -PxmlSpotBugsReport=true
   
   ./gradlew -PscalaVersion=2.13 clean compileJava compileScala compileTestJava compileTestScala spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat --profile --no-daemon --continue -PxmlSpotBugsReport=true
   
   ./gradlew -PscalaVersion=2.12 unitTest integrationTest --profile --no-daemon --continue -PtestLoggingEvents=started,passed,skipped,failed -PignoreFailures=true -PmaxTestRetries=1 -PmaxTestRetryFailures=5
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578870759



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -298,28 +342,30 @@ public void close() {
         public final List<T> records;
         public final MemoryRecords data;
         private final MemoryPool pool;
-        private final ByteBuffer buffer;
+        // Buffer that was allocated by the MemoryPool (pool). This may not be the buffer used in
+        // the MemoryRecords (data) object.
+        private final ByteBuffer pooledBuffer;
 
         private CompletedBatch(
             long baseOffset,
             List<T> records,
             MemoryRecords data,
             MemoryPool pool,
-            ByteBuffer buffer
+            ByteBuffer pooledBuffer

Review comment:
       nit: might be worth using the same name in `BatchBuilder`. Currently we use `initialBuffer`

##########
File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
##########
@@ -164,47 +170,84 @@ public void testUnflushedBuffersReleasedByClose() {
 
     @Test
     public void testSingleBatchAccumulation() {
-        int leaderEpoch = 17;
-        long baseOffset = 157;
-        int lingerMs = 50;
-        int maxBatchSize = 512;
-
-        Mockito.when(memoryPool.tryAllocate(maxBatchSize))
-            .thenReturn(ByteBuffer.allocate(maxBatchSize));
-
-        BatchAccumulator<String> acc = buildAccumulator(
-            leaderEpoch,
-            baseOffset,
-            lingerMs,
-            maxBatchSize
-        );
-
-        List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
-        assertEquals(baseOffset, acc.append(leaderEpoch, records.subList(0, 1)));
-        assertEquals(baseOffset + 2, acc.append(leaderEpoch, records.subList(1, 3)));
-        assertEquals(baseOffset + 5, acc.append(leaderEpoch, records.subList(3, 6)));
-        assertEquals(baseOffset + 7, acc.append(leaderEpoch, records.subList(6, 8)));
-        assertEquals(baseOffset + 8, acc.append(leaderEpoch, records.subList(8, 9)));
-
-        time.sleep(lingerMs);
-        assertTrue(acc.needsDrain(time.milliseconds()));
-
-        List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
-        assertEquals(1, batches.size());
-        assertFalse(acc.needsDrain(time.milliseconds()));
-        assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds()));
-
-        BatchAccumulator.CompletedBatch<String> batch = batches.get(0);
-        assertEquals(records, batch.records);
-        assertEquals(baseOffset, batch.baseOffset);
+        asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
+            int leaderEpoch = 17;
+            long baseOffset = 157;
+            int lingerMs = 50;
+            int maxBatchSize = 512;
+
+            Mockito.when(memoryPool.tryAllocate(maxBatchSize))
+                .thenReturn(ByteBuffer.allocate(maxBatchSize));
+
+            BatchAccumulator<String> acc = buildAccumulator(
+                leaderEpoch,
+                baseOffset,
+                lingerMs,
+                maxBatchSize
+            );
+
+            List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
+            assertEquals(baseOffset, appender.call(acc, leaderEpoch, records.subList(0, 1)));
+            assertEquals(baseOffset + 2, appender.call(acc, leaderEpoch, records.subList(1, 3)));
+            assertEquals(baseOffset + 5, appender.call(acc, leaderEpoch, records.subList(3, 6)));
+            assertEquals(baseOffset + 7, appender.call(acc, leaderEpoch, records.subList(6, 8)));
+            assertEquals(baseOffset + 8, appender.call(acc, leaderEpoch, records.subList(8, 9)));
+
+            time.sleep(lingerMs);
+            assertTrue(acc.needsDrain(time.milliseconds()));
+
+            List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
+            assertEquals(1, batches.size());
+            assertFalse(acc.needsDrain(time.milliseconds()));
+            assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds()));
+
+            BatchAccumulator.CompletedBatch<String> batch = batches.get(0);
+            assertEquals(records, batch.records);
+            assertEquals(baseOffset, batch.baseOffset);
+        });
     }
 
     @Test
     public void testMultipleBatchAccumulation() {
+        asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
+            int leaderEpoch = 17;
+            long baseOffset = 157;
+            int lingerMs = 50;
+            int maxBatchSize = 256;
+
+            Mockito.when(memoryPool.tryAllocate(maxBatchSize))
+                .thenReturn(ByteBuffer.allocate(maxBatchSize));
+
+            BatchAccumulator<String> acc = buildAccumulator(
+                leaderEpoch,
+                baseOffset,
+                lingerMs,
+                maxBatchSize
+            );
+
+            // Append entries until we have 4 batches to drain (3 completed, 1 building)
+            while (acc.numCompletedBatches() < 3) {
+                appender.call(acc, leaderEpoch, singletonList("foo"));
+            }
+
+            List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
+            assertEquals(4, batches.size());
+            assertTrue(batches.stream().allMatch(batch -> batch.data.sizeInBytes() <= maxBatchSize));
+        });
+    }
+
+    @Test void testRecordsAreSplit() {

Review comment:
       nit: it's a little weird for this test alone to have the annotation on the same line as the method definition




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r574622499



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,24 +80,69 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {

Review comment:
       The gist looks pretty reasonable 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578085923



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,67 +80,110 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {
+        return append(epoch, records, false);
+    }
+
+    /**
+     * Append a list of records into an atomic batch. We guarantee all records are included in the
+     * same underlying record batch so that either all of the records become committed or none of
+     * them do.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in a batch
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
+     *         batch size; if this exception is throw none of the elements in records were
+     *         committed
+     */
+    public Long appendAtomic(int epoch, List<T> records) {
+        return append(epoch, records, true);
+    }
+
+    private Long append(int epoch, List<T> records, boolean isAtomic) {
         if (epoch != this.epoch) {
-            // If the epoch does not match, then the state machine probably
-            // has not gotten the notification about the latest epoch change.
-            // In this case, ignore the append and return a large offset value
-            // which will never be committed.
             return Long.MAX_VALUE;
         }
 
         ObjectSerializationCache serializationCache = new ObjectSerializationCache();
-        int batchSize = 0;
-        for (T record : records) {
-            batchSize += serde.recordSize(record, serializationCache);
-        }
-
-        if (batchSize > maxBatchSize) {
-            throw new IllegalArgumentException("The total size of " + records + " is " + batchSize +
-                ", which exceeds the maximum allowed batch size of " + maxBatchSize);
-        }
+        int[] recordSizes = records
+            .stream()
+            .mapToInt(record -> serde.recordSize(record, serializationCache))
+            .toArray();
 
         appendLock.lock();
         try {
             maybeCompleteDrain();
 
-            BatchBuilder<T> batch = maybeAllocateBatch(batchSize);
-            if (batch == null) {
-                return null;
-            }
-
-            // Restart the linger timer if necessary
-            if (!lingerTimer.isRunning()) {
-                lingerTimer.reset(time.milliseconds() + lingerMs);
+            BatchBuilder<T> batch = null;
+            if (isAtomic) {
+                batch = maybeAllocateBatch(recordSizes);
             }
 
             for (T record : records) {
+                if (!isAtomic) {
+                    batch = maybeAllocateBatch(
+                        new int[] {serde.recordSize(record, serializationCache)}
+                    );
+                }
+
+                if (batch == null) {
+                    return null;
+                }
+
                 batch.appendRecord(record, serializationCache);
                 nextOffset += 1;
             }
 
+            maybeResetLinger();
+
             return nextOffset - 1;
         } finally {
             appendLock.unlock();
         }
     }
 
-    private BatchBuilder<T> maybeAllocateBatch(int batchSize) {
-        if (currentBatch == null) {
+    private void maybeResetLinger() {
+        if (!lingerTimer.isRunning()) {
+            lingerTimer.reset(time.milliseconds() + lingerMs);
+        }
+    }
+
+    private BatchBuilder<T> maybeAllocateBatch(int[] recordSizes) {
+        int bytesNeeded = BatchBuilder.batchSizeForRecordSizes(

Review comment:
       We discussed this offline. For this PR we decided to avoid computing the sizes twice.
   
   In the future we want avoid having strict enforcement of batch sizes. We can avoid this we if add support in the Raft client or controller for applying a set of records atomically even if they are not in the same batch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#issuecomment-781790693


   The following tests failed:
   ```
   core:
   
       ConfigCommandTest. shouldFailIfUnresolvableHost()
       ConfigCommandTest. shouldFailIfUnresolvableHost()
       ConfigCommandTest. shouldFailIfUnresolvableHostUsingZookeeper()
       ConfigCommandTest. shouldFailIfUnresolvableHostUsingZookeeper()
       DynamicConfigChangeTest. testIpHandlerUnresolvableAddress()
       DynamicConfigChangeTest. testIpHandlerUnresolvableAddress()
       DynamicConfigTest. shouldFailIpConfigsWithBadHost()
       DynamicConfigTest. shouldFailIpConfigsWithBadHost()
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578675850



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,70 +82,111 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * records will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {
+        return append(epoch, records, false);
+    }
+
+    /**
+     * Append a list of records into an atomic batch. We guarantee all records are included in the
+     * same underlying record batch so that either all of the records become committed or none of
+     * them do.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in a batch
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
+     *         batch size; if this exception is throw none of the elements in records were
+     *         committed
+     */
+    public Long appendAtomic(int epoch, List<T> records) {
+        return append(epoch, records, true);
+    }
+
+    private Long append(int epoch, List<T> records, boolean isAtomic) {
         if (epoch != this.epoch) {
-            // If the epoch does not match, then the state machine probably
-            // has not gotten the notification about the latest epoch change.
-            // In this case, ignore the append and return a large offset value
-            // which will never be committed.
             return Long.MAX_VALUE;
         }
 
         ObjectSerializationCache serializationCache = new ObjectSerializationCache();
-        int batchSize = 0;
-        for (T record : records) {
-            batchSize += serde.recordSize(record, serializationCache);
-        }
-
-        if (batchSize > maxBatchSize) {
-            throw new IllegalArgumentException("The total size of " + records + " is " + batchSize +
-                ", which exceeds the maximum allowed batch size of " + maxBatchSize);
-        }
 
         appendLock.lock();
         try {
             maybeCompleteDrain();
 
-            BatchBuilder<T> batch = maybeAllocateBatch(batchSize);
-            if (batch == null) {
-                return null;
-            }
-
-            // Restart the linger timer if necessary
-            if (!lingerTimer.isRunning()) {
-                lingerTimer.reset(time.milliseconds() + lingerMs);
+            BatchBuilder<T> batch = null;
+            if (isAtomic) {
+                batch = maybeAllocateBatch(records, serializationCache);
             }
 
             for (T record : records) {
+                if (!isAtomic) {
+                    batch = maybeAllocateBatch(Collections.singleton(record), serializationCache);
+                }
+
+                if (batch == null) {
+                    return null;
+                }
+
                 batch.appendRecord(record, serializationCache);
                 nextOffset += 1;
             }
 
+            maybeResetLinger();
+
             return nextOffset - 1;
         } finally {
             appendLock.unlock();
         }
     }
 
-    private BatchBuilder<T> maybeAllocateBatch(int batchSize) {
+    private void maybeResetLinger() {
+        if (!lingerTimer.isRunning()) {
+            lingerTimer.reset(time.milliseconds() + lingerMs);
+        }
+    }
+
+    private BatchBuilder<T> maybeAllocateBatch(
+        Collection<T> records,
+        ObjectSerializationCache serializationCache
+    ) {
         if (currentBatch == null) {
             startNewBatch();
-        } else if (!currentBatch.hasRoomFor(batchSize)) {
-            completeCurrentBatch();
-            startNewBatch();
         }
+
+        if (currentBatch != null) {
+            OptionalInt bytesNeeded = currentBatch.roomFor(records, serializationCache);

Review comment:
       So `roomFor` returns the the bytes needed to accommodate the batch when there is not enough room in the current batch. Perhaps an alternative name might be `overflowBytes` or maybe `bytesNeeded` to match the variable name here?

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
##########
@@ -307,4 +309,38 @@ public int writeRecord(
         recordOutput.writeVarint(0);
         return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
     }
+
+    private int batchHeaderSizeInBytes() {
+        return AbstractRecords.recordBatchHeaderSizeInBytes(
+            RecordBatch.MAGIC_VALUE_V2,
+            compressionType
+        );
+    }
+
+    private int bytesNeededForRecords(
+        Collection<T> records,
+        ObjectSerializationCache serializationCache
+    ) {
+        long expectedNextOffset = nextOffset;
+        int bytesNeeded = 0;
+        for (T record : records) {
+            if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) {
+                return Integer.MAX_VALUE;

Review comment:
       Hmm.. If we return `Integer.MAX_VALUE`, then that will prevent us from being able to split when the number of records exceeds `Integer.MAX_VALUE`. Similarly, it might be worth hardening this logic by ensuring `bytesNeeded` does not overflow.  These cases are unlikely in practice, so we can save them for a follow-up if there is no clear way to handle them right now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#issuecomment-777182668


   @mumrah I found a few more issues on how we compute the needed sizes. I need to fix one of the remaining issues and write tests for them. Let's assume that this PR is still a WIP.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r574069573



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,24 +80,69 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {
         if (epoch != this.epoch) {
-            // If the epoch does not match, then the state machine probably
-            // has not gotten the notification about the latest epoch change.
-            // In this case, ignore the append and return a large offset value
-            // which will never be committed.
+            return Long.MAX_VALUE;
+        }
+
+        ObjectSerializationCache serializationCache = new ObjectSerializationCache();
+
+        appendLock.lock();
+        try {
+            maybeCompleteDrain();
+
+            for (T record : records) {
+                BatchBuilder<T> batch = maybeAllocateBatch(

Review comment:
       No. `maybeAllocateBatch` should only create a new batch if the record with a value of size `serde.recordSize(record, serializationCache)` doesn't fit in the current batch being built. We also don't run the risk of the polling thread draining the batch too early because this method holds on to the lock during this iteration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r576395637



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,24 +80,69 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {

Review comment:
       Done. Implemented the Gist.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r577225891



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,67 +80,110 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {
+        return append(epoch, records, false);
+    }
+
+    /**
+     * Append a list of records into an atomic batch. We guarantee all records are included in the
+     * same underlying record batch so that either all of the records become committed or none of
+     * them do.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in a batch
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
+     *         batch size; if this exception is throw none of the elements in records were
+     *         committed
+     */
+    public Long appendAtomic(int epoch, List<T> records) {
+        return append(epoch, records, true);
+    }
+
+    private Long append(int epoch, List<T> records, boolean isAtomic) {
         if (epoch != this.epoch) {
-            // If the epoch does not match, then the state machine probably
-            // has not gotten the notification about the latest epoch change.
-            // In this case, ignore the append and return a large offset value
-            // which will never be committed.
             return Long.MAX_VALUE;
         }
 
         ObjectSerializationCache serializationCache = new ObjectSerializationCache();
-        int batchSize = 0;
-        for (T record : records) {
-            batchSize += serde.recordSize(record, serializationCache);
-        }
-
-        if (batchSize > maxBatchSize) {
-            throw new IllegalArgumentException("The total size of " + records + " is " + batchSize +
-                ", which exceeds the maximum allowed batch size of " + maxBatchSize);
-        }
+        int[] recordSizes = records
+            .stream()
+            .mapToInt(record -> serde.recordSize(record, serializationCache))
+            .toArray();
 
         appendLock.lock();
         try {
             maybeCompleteDrain();
 
-            BatchBuilder<T> batch = maybeAllocateBatch(batchSize);
-            if (batch == null) {
-                return null;
-            }
-
-            // Restart the linger timer if necessary
-            if (!lingerTimer.isRunning()) {
-                lingerTimer.reset(time.milliseconds() + lingerMs);
+            BatchBuilder<T> batch = null;
+            if (isAtomic) {
+                batch = maybeAllocateBatch(recordSizes);
             }
 
             for (T record : records) {
+                if (!isAtomic) {
+                    batch = maybeAllocateBatch(
+                        new int[] {serde.recordSize(record, serializationCache)}
+                    );
+                }
+
+                if (batch == null) {
+                    return null;
+                }
+
                 batch.appendRecord(record, serializationCache);
                 nextOffset += 1;
             }
 
+            maybeResetLinger();
+
             return nextOffset - 1;
         } finally {
             appendLock.unlock();
         }
     }
 
-    private BatchBuilder<T> maybeAllocateBatch(int batchSize) {
-        if (currentBatch == null) {
+    private void maybeResetLinger() {
+        if (!lingerTimer.isRunning()) {
+            lingerTimer.reset(time.milliseconds() + lingerMs);
+        }
+    }
+
+    private BatchBuilder<T> maybeAllocateBatch(int[] recordSizes) {
+        int bytesNeeded = BatchBuilder.batchSizeForRecordSizes(

Review comment:
       The call to `batchSizeForRecordSizes` delegates to `bytesNeededForRecordSizes`, which does a full pass over the record sizes and adds the size of the header. Then below `hasRoomFor` makes another call to `bytesNeededForRecordSizes`. Can we avoid the re-computation?
   
   In general, I wonder if we can be a little looser about the batch size limit and avoid some of the extra work. When compression is enabled, these are just estimates anyway. What if we treat the batch size limit as the maximum batch size before compression and ignore overhead? Basically we can set max.message.bytes to Int.MaxValue and just enforce the limit here on the pre-compression bytes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org