You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/17 00:07:12 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10063: KAFKA-12258: Add support for splitting appending records

hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r577216920



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -77,6 +79,29 @@ default void handleResign() {}
      */
     void register(Listener<T> listener);
 
+    /**
+     * Append a list of records to the log. The write will be scheduled for some time
+     * in the future. There is no guarantee that appended records will be written to
+     * the log and eventually committed. While the order of the records is preserve, they can
+     * be appended to the log using one or more batches. This means that each record could
+     * be committed independently.

Review comment:
       It might already be clear enough given the previous sentence, but maybe we could emphasize that if any record becomes committed, then all records ordered before it are guaranteed to be committed as well.

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,67 +80,110 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {
+        return append(epoch, records, false);
+    }
+
+    /**
+     * Append a list of records into an atomic batch. We guarantee all records are included in the
+     * same underlying record batch so that either all of the records become committed or none of
+     * them do.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in a batch
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
+     *         batch size; if this exception is throw none of the elements in records were
+     *         committed
+     */
+    public Long appendAtomic(int epoch, List<T> records) {
+        return append(epoch, records, true);
+    }
+
+    private Long append(int epoch, List<T> records, boolean isAtomic) {
         if (epoch != this.epoch) {
-            // If the epoch does not match, then the state machine probably
-            // has not gotten the notification about the latest epoch change.
-            // In this case, ignore the append and return a large offset value
-            // which will never be committed.
             return Long.MAX_VALUE;
         }
 
         ObjectSerializationCache serializationCache = new ObjectSerializationCache();
-        int batchSize = 0;
-        for (T record : records) {
-            batchSize += serde.recordSize(record, serializationCache);
-        }
-
-        if (batchSize > maxBatchSize) {
-            throw new IllegalArgumentException("The total size of " + records + " is " + batchSize +
-                ", which exceeds the maximum allowed batch size of " + maxBatchSize);
-        }
+        int[] recordSizes = records
+            .stream()
+            .mapToInt(record -> serde.recordSize(record, serializationCache))
+            .toArray();
 
         appendLock.lock();
         try {
             maybeCompleteDrain();
 
-            BatchBuilder<T> batch = maybeAllocateBatch(batchSize);
-            if (batch == null) {
-                return null;
-            }
-
-            // Restart the linger timer if necessary
-            if (!lingerTimer.isRunning()) {
-                lingerTimer.reset(time.milliseconds() + lingerMs);
+            BatchBuilder<T> batch = null;
+            if (isAtomic) {
+                batch = maybeAllocateBatch(recordSizes);
             }
 
             for (T record : records) {
+                if (!isAtomic) {
+                    batch = maybeAllocateBatch(
+                        new int[] {serde.recordSize(record, serializationCache)}
+                    );
+                }
+
+                if (batch == null) {
+                    return null;
+                }
+
                 batch.appendRecord(record, serializationCache);
                 nextOffset += 1;
             }
 
+            maybeResetLinger();
+
             return nextOffset - 1;
         } finally {
             appendLock.unlock();
         }
     }
 
-    private BatchBuilder<T> maybeAllocateBatch(int batchSize) {
-        if (currentBatch == null) {
+    private void maybeResetLinger() {
+        if (!lingerTimer.isRunning()) {
+            lingerTimer.reset(time.milliseconds() + lingerMs);
+        }
+    }
+
+    private BatchBuilder<T> maybeAllocateBatch(int[] recordSizes) {
+        int bytesNeeded = BatchBuilder.batchSizeForRecordSizes(

Review comment:
       The call to `batchSizeForRecordSizes` delegates to `bytesNeededForRecordSizes`, which does a full pass over the record sizes and adds the size of the header. Then below `hasRoomFor` makes another call to `bytesNeededForRecordSizes`. Can we avoid the re-computation?
   
   In general, I wonder if we can be a little looser about the batch size limit and avoid some of the extra work. When compression is enabled, these are just estimates anyway. What if we treat the batch size limit as the maximum batch size before compression and ignore overhead. Basically we can set max.message.bytes to Int.MaxValue and just enforce the limit here on the pre-compression bytes.

##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -79,67 +80,110 @@ public BatchAccumulator(
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * recors will get committed while other will not when the leader fails.

Review comment:
       typo: `recors`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org