You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/02/19 19:32:13 UTC

[kafka] branch 2.8 updated (e39bb7a -> 0b4aa2b)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from e39bb7a  MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155
     new dd8a139  KAFKA-12258; Add support for splitting appending records (#10063)
     new 0b4aa2b  KAFKA-12332; Error partitions from topics with invalid IDs in LISR requests (#10143)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...tion.java => InconsistentTopicIdException.java} |   4 +-
 .../org/apache/kafka/common/protocol/Errors.java   |   4 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |  40 +++++-
 .../main/scala/kafka/server/ReplicaManager.scala   |  31 ++--
 .../unit/kafka/server/ReplicaManagerTest.scala     |  62 +++++++-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  25 +++-
 .../java/org/apache/kafka/raft/RaftClient.java     |  48 +++++--
 .../kafka/raft/internals/BatchAccumulator.java     | 124 +++++++++++-----
 .../apache/kafka/raft/internals/BatchBuilder.java  | 102 +++++++++----
 .../kafka/raft/internals/BatchAccumulatorTest.java | 159 ++++++++++++++++-----
 .../kafka/raft/internals/BatchBuilderTest.java     |   9 +-
 11 files changed, 448 insertions(+), 160 deletions(-)
 copy clients/src/main/java/org/apache/kafka/common/errors/{UnknownTopicIdException.java => InconsistentTopicIdException.java} (87%)


[kafka] 01/02: KAFKA-12258; Add support for splitting appending records (#10063)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit dd8a13996f33c45fb3bb66a3c4bc6b918b30f94d
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Thu Feb 18 19:46:23 2021 -0800

    KAFKA-12258; Add support for splitting appending records (#10063)
    
    1. Type `BatchAccumulator`. Add support for appending records into one or more batches.
    2. Type `RaftClient`. Rename `scheduleAppend` to `scheduleAtomicAppend`.
    3. Type `RaftClient`. Add a new method `scheduleAppend` which appends records to the log using as many batches as necessary.
    4. Increase the batch size from 1MB to 8MB.
    
    Reviewers: David Arthur <mu...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  25 +++-
 .../java/org/apache/kafka/raft/RaftClient.java     |  48 +++++--
 .../kafka/raft/internals/BatchAccumulator.java     | 124 +++++++++++-----
 .../apache/kafka/raft/internals/BatchBuilder.java  | 102 +++++++++----
 .../kafka/raft/internals/BatchAccumulatorTest.java | 159 ++++++++++++++++-----
 .../kafka/raft/internals/BatchBuilderTest.java     |   9 +-
 6 files changed, 339 insertions(+), 128 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index a5c6c6c..49aabd3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -134,7 +134,7 @@ import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
 public class KafkaRaftClient<T> implements RaftClient<T> {
     private static final int RETRY_BACKOFF_BASE_MS = 100;
     private static final int FETCH_MAX_WAIT_MS = 1000;
-    static final int MAX_BATCH_SIZE = 1024 * 1024;
+    static final int MAX_BATCH_SIZE = 8 * 1024 * 1024;
 
     private final AtomicReference<GracefulShutdown> shutdown = new AtomicReference<>();
     private final Logger logger;
@@ -2181,13 +2181,27 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
     @Override
     public Long scheduleAppend(int epoch, List<T> records) {
+        return append(epoch, records, false);
+    }
+
+    @Override
+    public Long scheduleAtomicAppend(int epoch, List<T> records) {
+        return append(epoch, records, true);
+    }
+
+    private Long append(int epoch, List<T> records, boolean isAtomic) {
         BatchAccumulator<T> accumulator = this.accumulator;
         if (accumulator == null) {
             return Long.MAX_VALUE;
         }
 
         boolean isFirstAppend = accumulator.isEmpty();
-        Long offset = accumulator.append(epoch, records);
+        final Long offset;
+        if (isAtomic) {
+            offset = accumulator.appendAtomic(epoch, records);
+        } else {
+            offset = accumulator.append(epoch, records);
+        }
 
         // Wakeup the network channel if either this is the first append
         // or the accumulator is ready to drain now. Checking for the first
@@ -2344,9 +2358,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
         /**
          * This API is used for committed records originating from {@link #scheduleAppend(int, List)}
-         * on this instance. In this case, we are able to save the original record objects,
-         * which saves the need to read them back from disk. This is a nice optimization
-         * for the leader which is typically doing more work than all of the followers.
+         * or {@link #scheduleAtomicAppend(int, List)} on this instance. In this case, we are able to
+         * save the original record objects, which saves the need to read them back from disk. This is
+         * a nice optimization for the leader which is typically doing more work than all of the
+         * followers.
          */
         public void fireHandleCommit(long baseOffset, int epoch, List<T> records) {
             BatchReader.Batch<T> batch = new BatchReader.Batch<>(baseOffset, epoch, records);
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index e2bec0e..74488b4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -32,11 +32,13 @@ public interface RaftClient<T> extends Closeable {
          * after consuming the reader.
          *
          * Note that there is not a one-to-one correspondence between writes through
-         * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation
-         * is free to batch together the records from multiple append calls provided
-         * that batch boundaries are respected. This means that each batch specified
-         * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of
-         * a batch provided by the {@link BatchReader}.
+         * {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)}
+         * and this callback. The Raft implementation is free to batch together the records
+         * from multiple append calls provided that batch boundaries are respected. Records
+         * specified through {@link #scheduleAtomicAppend(int, List)} are guaranteed to be a
+         * subset of a batch provided by the {@link BatchReader}. Records specified through
+         * {@link #scheduleAppend(int, List)} are guaranteed to be in the same order but
+         * they can map to any number of batches provided by the {@link BatchReader}.
          *
          * @param reader reader instance which must be iterated and closed
          */
@@ -48,7 +50,7 @@ public interface RaftClient<T> extends Closeable {
          * {@link #handleCommit(BatchReader)}.
          *
          * After becoming a leader, the client is eligible to write to the log
-         * using {@link #scheduleAppend(int, List)}.
+         * using {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)}.
          *
          * @param epoch the claimed leader epoch
          */
@@ -87,6 +89,30 @@ public interface RaftClient<T> extends Closeable {
     /**
      * Append a list of records to the log. The write will be scheduled for some time
      * in the future. There is no guarantee that appended records will be written to
+     * the log and eventually committed. While the order of the records is preserve, they can
+     * be appended to the log using one or more batches. Each record may be committed independently.
+     * If a record is committed, then all records scheduled for append during this epoch
+     * and prior to this record are also committed.
+     *
+     * If the provided current leader epoch does not match the current epoch, which
+     * is possible when the state machine has yet to observe the epoch change, then
+     * this method will return {@link Long#MAX_VALUE} to indicate an offset which is
+     * not possible to become committed. The state machine is expected to discard all
+     * uncommitted entries after observing an epoch change.
+     *
+     * @param epoch the current leader epoch
+     * @param records the list of records to append
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the records could
+     *         be committed; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
+     *         batch size; if this exception is throw none of the elements in records were
+     *         committed
+     */
+    Long scheduleAppend(int epoch, List<T> records);
+
+    /**
+     * Append a list of records to the log. The write will be scheduled for some time
+     * in the future. There is no guarantee that appended records will be written to
      * the log and eventually committed. However, it is guaranteed that if any of the
      * records become committed, then all of them will be.
      *
@@ -98,11 +124,13 @@ public interface RaftClient<T> extends Closeable {
      *
      * @param epoch the current leader epoch
      * @param records the list of records to append
-     * @return the offset within the current epoch that the log entries will be appended,
-     *         or null if the leader was unable to accept the write (e.g. due to memory
-     *         being reached).
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the records could
+     *         be committed; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
+     *         batch size; if this exception is throw none of the elements in records were
+     *         committed
      */
-    Long scheduleAppend(int epoch, List<T> records);
+    Long scheduleAtomicAppend(int epoch, List<T> records);
 
     /**
      * Attempt a graceful shutdown of the client. This allows the leader to proactively
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 5331e4d..07d1015 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.raft.internals;
 
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.record.CompressionType;
@@ -26,8 +27,10 @@ import org.apache.kafka.raft.RecordSerde;
 import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalInt;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
@@ -79,70 +82,111 @@ public class BatchAccumulator<T> implements Closeable {
     }
 
     /**
-     * Append a list of records into an atomic batch. We guarantee all records
-     * are included in the same underlying record batch so that either all of
-     * the records become committed or none of them do.
+     * Append a list of records into as many batches as necessary.
      *
-     * @param epoch the expected leader epoch. If this does not match, then
-     *              {@link Long#MAX_VALUE} will be returned as an offset which
-     *              cannot become committed.
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record (which will be
-     *         {@link Long#MAX_VALUE} if the epoch does not match), or null if
-     *         no memory could be allocated for the batch at this time
+     * The order of the elements in the records argument will match the order in the batches.
+     * This method will use as many batches as necessary to serialize all of the records. Since
+     * this method can split the records into multiple batches it is possible that some of the
+     * records will get committed while other will not when the leader fails.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in the batches
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
+     *         batch size; if this exception is throw some of the elements in records may have
+     *         been committed
      */
     public Long append(int epoch, List<T> records) {
+        return append(epoch, records, false);
+    }
+
+    /**
+     * Append a list of records into an atomic batch. We guarantee all records are included in the
+     * same underlying record batch so that either all of the records become committed or none of
+     * them do.
+     *
+     * @param epoch the expected leader epoch. If this does not match, then {@link Long#MAX_VALUE}
+     *              will be returned as an offset which cannot become committed
+     * @param records the list of records to include in a batch
+     * @return the expected offset of the last record; {@link Long#MAX_VALUE} if the epoch does not
+     *         match; null if no memory could be allocated for the batch at this time
+     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
+     *         batch size; if this exception is throw none of the elements in records were
+     *         committed
+     */
+    public Long appendAtomic(int epoch, List<T> records) {
+        return append(epoch, records, true);
+    }
+
+    private Long append(int epoch, List<T> records, boolean isAtomic) {
         if (epoch != this.epoch) {
-            // If the epoch does not match, then the state machine probably
-            // has not gotten the notification about the latest epoch change.
-            // In this case, ignore the append and return a large offset value
-            // which will never be committed.
             return Long.MAX_VALUE;
         }
 
         ObjectSerializationCache serializationCache = new ObjectSerializationCache();
-        int batchSize = 0;
-        for (T record : records) {
-            batchSize += serde.recordSize(record, serializationCache);
-        }
-
-        if (batchSize > maxBatchSize) {
-            throw new IllegalArgumentException("The total size of " + records + " is " + batchSize +
-                ", which exceeds the maximum allowed batch size of " + maxBatchSize);
-        }
 
         appendLock.lock();
         try {
             maybeCompleteDrain();
 
-            BatchBuilder<T> batch = maybeAllocateBatch(batchSize);
-            if (batch == null) {
-                return null;
-            }
-
-            // Restart the linger timer if necessary
-            if (!lingerTimer.isRunning()) {
-                lingerTimer.reset(time.milliseconds() + lingerMs);
+            BatchBuilder<T> batch = null;
+            if (isAtomic) {
+                batch = maybeAllocateBatch(records, serializationCache);
             }
 
             for (T record : records) {
+                if (!isAtomic) {
+                    batch = maybeAllocateBatch(Collections.singleton(record), serializationCache);
+                }
+
+                if (batch == null) {
+                    return null;
+                }
+
                 batch.appendRecord(record, serializationCache);
                 nextOffset += 1;
             }
 
+            maybeResetLinger();
+
             return nextOffset - 1;
         } finally {
             appendLock.unlock();
         }
     }
 
-    private BatchBuilder<T> maybeAllocateBatch(int batchSize) {
+    private void maybeResetLinger() {
+        if (!lingerTimer.isRunning()) {
+            lingerTimer.reset(time.milliseconds() + lingerMs);
+        }
+    }
+
+    private BatchBuilder<T> maybeAllocateBatch(
+        Collection<T> records,
+        ObjectSerializationCache serializationCache
+    ) {
         if (currentBatch == null) {
             startNewBatch();
-        } else if (!currentBatch.hasRoomFor(batchSize)) {
-            completeCurrentBatch();
-            startNewBatch();
         }
+
+        if (currentBatch != null) {
+            OptionalInt bytesNeeded = currentBatch.bytesNeeded(records, serializationCache);
+            if (bytesNeeded.isPresent() && bytesNeeded.getAsInt() > maxBatchSize) {
+                throw new RecordBatchTooLargeException(
+                    String.format(
+                        "The total record(s) size of %s exceeds the maximum allowed batch size of %s",
+                        bytesNeeded.getAsInt(),
+                        maxBatchSize
+                    )
+                );
+            } else if (bytesNeeded.isPresent()) {
+                completeCurrentBatch();
+                startNewBatch();
+            }
+        }
+
         return currentBatch;
     }
 
@@ -298,20 +342,22 @@ public class BatchAccumulator<T> implements Closeable {
         public final List<T> records;
         public final MemoryRecords data;
         private final MemoryPool pool;
-        private final ByteBuffer buffer;
+        // Buffer that was allocated by the MemoryPool (pool). This may not be the buffer used in
+        // the MemoryRecords (data) object.
+        private final ByteBuffer initialBuffer;
 
         private CompletedBatch(
             long baseOffset,
             List<T> records,
             MemoryRecords data,
             MemoryPool pool,
-            ByteBuffer buffer
+            ByteBuffer initialBuffer
         ) {
             this.baseOffset = baseOffset;
             this.records = records;
             this.data = data;
             this.pool = pool;
-            this.buffer = buffer;
+            this.initialBuffer = initialBuffer;
         }
 
         public int sizeInBytes() {
@@ -319,7 +365,7 @@ public class BatchAccumulator<T> implements Closeable {
         }
 
         public void release() {
-            pool.release(buffer);
+            pool.release(initialBuffer);
         }
     }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
index 542bb51..c953b6a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
@@ -33,12 +33,14 @@ import org.apache.kafka.raft.RecordSerde;
 import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.OptionalInt;
 
 /**
  * Collect a set of records into a single batch. New records are added
  * through {@link #appendRecord(Object, ObjectSerializationCache)}, but the caller must first
- * check whether there is room using {@link #hasRoomFor(int)}. Once the
+ * check whether there is room using {@link #bytesNeeded(Collection, ObjectSerializationCache)}. Once the
  * batch is ready, then {@link #build()} should be used to get the resulting
  * {@link MemoryRecords} instance.
  *
@@ -85,8 +87,8 @@ public class BatchBuilder<T> {
         this.maxBytes = maxBytes;
         this.records = new ArrayList<>();
 
-        int batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(
-            RecordBatch.MAGIC_VALUE_V2, compressionType);
+        // field compressionType must be set before calculating the batch header size
+        int batchHeaderSizeInBytes = batchHeaderSizeInBytes();
         batchOutput.position(initialPosition + batchHeaderSizeInBytes);
 
         this.recordOutput = new DataOutputStreamWritable(new DataOutputStream(
@@ -95,7 +97,7 @@ public class BatchBuilder<T> {
 
     /**
      * Append a record to this patch. The caller must first verify there is room for the batch
-     * using {@link #hasRoomFor(int)}.
+     * using {@link #bytesNeeded(Collection, ObjectSerializationCache)}.
      *
      * @param record the record to append
      * @param serializationCache serialization cache for use in {@link RecordSerde#write(Object, ObjectSerializationCache, Writable)}
@@ -103,7 +105,7 @@ public class BatchBuilder<T> {
      */
     public long appendRecord(T record, ObjectSerializationCache serializationCache) {
         if (!isOpenForAppends) {
-            throw new IllegalArgumentException("Cannot append new records after the batch has been built");
+            throw new IllegalStateException("Cannot append new records after the batch has been built");
         }
 
         if (nextOffset - baseOffset > Integer.MAX_VALUE) {
@@ -123,39 +125,39 @@ public class BatchBuilder<T> {
     }
 
     /**
-     * Check whether the batch has enough room for a record of the given size in bytes.
+     * Check whether the batch has enough room for all the record values.
      *
-     * @param sizeInBytes the size of the record to be appended
-     * @return true if there is room for the record to be appended, false otherwise
+     * Returns an empty {@link OptionalInt} if the batch builder has room for this list of records.
+     * Otherwise it returns the expected number of bytes needed for a batch to contain these records.
+     *
+     * @param records the records to use when checking for room
+     * @param serializationCache serialization cache for computing sizes
+     * @return empty {@link OptionalInt} if there is room for the records to be appended, otherwise
+     *         returns the number of bytes needed
      */
-    public boolean hasRoomFor(int sizeInBytes) {
-        if (!isOpenForAppends) {
-            return false;
-        }
+    public OptionalInt bytesNeeded(Collection<T> records, ObjectSerializationCache serializationCache) {
+        int bytesNeeded = bytesNeededForRecords(
+            records,
+            serializationCache
+        );
 
-        if (nextOffset - baseOffset >= Integer.MAX_VALUE) {
-            return false;
+        if (!isOpenForAppends) {
+            return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
         }
 
-        int recordSizeInBytes = DefaultRecord.sizeOfBodyInBytes(
-            (int) (nextOffset - baseOffset),
-            0,
-            -1,
-            sizeInBytes,
-            DefaultRecord.EMPTY_HEADERS
-        );
-
-        int unusedSizeInBytes = maxBytes - approximateSizeInBytes();
-        if (unusedSizeInBytes >= recordSizeInBytes) {
-            return true;
+        int approxUnusedSizeInBytes = maxBytes - approximateSizeInBytes();
+        if (approxUnusedSizeInBytes >= bytesNeeded) {
+            return OptionalInt.empty();
         } else if (unflushedBytes > 0) {
             recordOutput.flush();
             unflushedBytes = 0;
-            unusedSizeInBytes = maxBytes - flushedSizeInBytes();
-            return unusedSizeInBytes >= recordSizeInBytes;
-        } else {
-            return false;
+            int unusedSizeInBytes = maxBytes - flushedSizeInBytes();
+            if (unusedSizeInBytes >= bytesNeeded) {
+                return OptionalInt.empty();
+            }
         }
+
+        return OptionalInt.of(batchHeaderSizeInBytes() + bytesNeeded);
     }
 
     private int flushedSizeInBytes() {
@@ -307,4 +309,46 @@ public class BatchBuilder<T> {
         recordOutput.writeVarint(0);
         return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
     }
+
+    private int batchHeaderSizeInBytes() {
+        return AbstractRecords.recordBatchHeaderSizeInBytes(
+            RecordBatch.MAGIC_VALUE_V2,
+            compressionType
+        );
+    }
+
+    private int bytesNeededForRecords(
+        Collection<T> records,
+        ObjectSerializationCache serializationCache
+    ) {
+        long expectedNextOffset = nextOffset;
+        int bytesNeeded = 0;
+        for (T record : records) {
+            if (expectedNextOffset - baseOffset >= Integer.MAX_VALUE) {
+                throw new IllegalArgumentException(
+                    String.format(
+                        "Adding %s records to a batch with base offset of %s and next offset of %s",
+                        records.size(),
+                        baseOffset,
+                        expectedNextOffset
+                    )
+                );
+            }
+
+            int recordSizeInBytes = DefaultRecord.sizeOfBodyInBytes(
+                (int) (expectedNextOffset  - baseOffset),
+                0,
+                -1,
+                serde.recordSize(record, serializationCache),
+                DefaultRecord.EMPTY_HEADERS
+            );
+
+            bytesNeeded = Math.addExact(bytesNeeded, ByteUtils.sizeOfVarint(recordSizeInBytes));
+            bytesNeeded = Math.addExact(bytesNeeded, recordSizeInBytes);
+
+            expectedNextOffset += 1;
+        }
+
+        return bytesNeeded;
+    }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
index 24e289d..b32168e 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
@@ -19,7 +19,11 @@ package org.apache.kafka.raft.internals;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.protocol.Writable;
+import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
@@ -29,6 +33,8 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -164,47 +170,85 @@ class BatchAccumulatorTest {
 
     @Test
     public void testSingleBatchAccumulation() {
-        int leaderEpoch = 17;
-        long baseOffset = 157;
-        int lingerMs = 50;
-        int maxBatchSize = 512;
-
-        Mockito.when(memoryPool.tryAllocate(maxBatchSize))
-            .thenReturn(ByteBuffer.allocate(maxBatchSize));
-
-        BatchAccumulator<String> acc = buildAccumulator(
-            leaderEpoch,
-            baseOffset,
-            lingerMs,
-            maxBatchSize
-        );
-
-        List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
-        assertEquals(baseOffset, acc.append(leaderEpoch, records.subList(0, 1)));
-        assertEquals(baseOffset + 2, acc.append(leaderEpoch, records.subList(1, 3)));
-        assertEquals(baseOffset + 5, acc.append(leaderEpoch, records.subList(3, 6)));
-        assertEquals(baseOffset + 7, acc.append(leaderEpoch, records.subList(6, 8)));
-        assertEquals(baseOffset + 8, acc.append(leaderEpoch, records.subList(8, 9)));
-
-        time.sleep(lingerMs);
-        assertTrue(acc.needsDrain(time.milliseconds()));
-
-        List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
-        assertEquals(1, batches.size());
-        assertFalse(acc.needsDrain(time.milliseconds()));
-        assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds()));
-
-        BatchAccumulator.CompletedBatch<String> batch = batches.get(0);
-        assertEquals(records, batch.records);
-        assertEquals(baseOffset, batch.baseOffset);
+        asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
+            int leaderEpoch = 17;
+            long baseOffset = 157;
+            int lingerMs = 50;
+            int maxBatchSize = 512;
+
+            Mockito.when(memoryPool.tryAllocate(maxBatchSize))
+                .thenReturn(ByteBuffer.allocate(maxBatchSize));
+
+            BatchAccumulator<String> acc = buildAccumulator(
+                leaderEpoch,
+                baseOffset,
+                lingerMs,
+                maxBatchSize
+            );
+
+            List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
+            assertEquals(baseOffset, appender.call(acc, leaderEpoch, records.subList(0, 1)));
+            assertEquals(baseOffset + 2, appender.call(acc, leaderEpoch, records.subList(1, 3)));
+            assertEquals(baseOffset + 5, appender.call(acc, leaderEpoch, records.subList(3, 6)));
+            assertEquals(baseOffset + 7, appender.call(acc, leaderEpoch, records.subList(6, 8)));
+            assertEquals(baseOffset + 8, appender.call(acc, leaderEpoch, records.subList(8, 9)));
+
+            time.sleep(lingerMs);
+            assertTrue(acc.needsDrain(time.milliseconds()));
+
+            List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
+            assertEquals(1, batches.size());
+            assertFalse(acc.needsDrain(time.milliseconds()));
+            assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds()));
+
+            BatchAccumulator.CompletedBatch<String> batch = batches.get(0);
+            assertEquals(records, batch.records);
+            assertEquals(baseOffset, batch.baseOffset);
+        });
     }
 
     @Test
     public void testMultipleBatchAccumulation() {
+        asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
+            int leaderEpoch = 17;
+            long baseOffset = 157;
+            int lingerMs = 50;
+            int maxBatchSize = 256;
+
+            Mockito.when(memoryPool.tryAllocate(maxBatchSize))
+                .thenReturn(ByteBuffer.allocate(maxBatchSize));
+
+            BatchAccumulator<String> acc = buildAccumulator(
+                leaderEpoch,
+                baseOffset,
+                lingerMs,
+                maxBatchSize
+            );
+
+            // Append entries until we have 4 batches to drain (3 completed, 1 building)
+            while (acc.numCompletedBatches() < 3) {
+                appender.call(acc, leaderEpoch, singletonList("foo"));
+            }
+
+            List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
+            assertEquals(4, batches.size());
+            assertTrue(batches.stream().allMatch(batch -> batch.data.sizeInBytes() <= maxBatchSize));
+        });
+    }
+
+    @Test
+    public void testRecordsAreSplit() {
         int leaderEpoch = 17;
         long baseOffset = 157;
         int lingerMs = 50;
-        int maxBatchSize = 256;
+        String record = "a";
+        int numberOfRecords = 9;
+        int recordsPerBatch = 2;
+        int batchHeaderSize = AbstractRecords.recordBatchHeaderSizeInBytes(
+            RecordBatch.MAGIC_VALUE_V2,
+            CompressionType.NONE
+        );
+        int maxBatchSize = batchHeaderSize + recordsPerBatch * recordSizeInBytes(record, recordsPerBatch);
 
         Mockito.when(memoryPool.tryAllocate(maxBatchSize))
             .thenReturn(ByteBuffer.allocate(maxBatchSize));
@@ -216,13 +260,19 @@ class BatchAccumulatorTest {
             maxBatchSize
         );
 
-        // Append entries until we have 4 batches to drain (3 completed, 1 building)
-        while (acc.numCompletedBatches() < 3) {
-            acc.append(leaderEpoch, singletonList("foo"));
-        }
+        List<String> records = Stream
+            .generate(() -> record)
+            .limit(numberOfRecords)
+            .collect(Collectors.toList());
+        assertEquals(baseOffset + numberOfRecords - 1, acc.append(leaderEpoch, records));
+
+        time.sleep(lingerMs);
+        assertTrue(acc.needsDrain(time.milliseconds()));
 
         List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
-        assertEquals(4, batches.size());
+        // ceilingDiv(records.size(), recordsPerBatch)
+        int expectedBatches = (records.size() + recordsPerBatch - 1) / recordsPerBatch;
+        assertEquals(expectedBatches, batches.size());
         assertTrue(batches.stream().allMatch(batch -> batch.data.sizeInBytes() <= maxBatchSize));
     }
 
@@ -306,4 +356,35 @@ class BatchAccumulatorTest {
         });
     }
 
+    int recordSizeInBytes(String record, int numberOfRecords) {
+        int serdeSize = serde.recordSize("a", new ObjectSerializationCache());
+
+        int recordSizeInBytes = DefaultRecord.sizeOfBodyInBytes(
+            numberOfRecords,
+            0,
+            -1,
+            serdeSize,
+            DefaultRecord.EMPTY_HEADERS
+        );
+
+        return ByteUtils.sizeOfVarint(recordSizeInBytes) + recordSizeInBytes;
+    }
+
+    static interface Appender {
+        Long call(BatchAccumulator<String> acc, int epoch, List<String> records);
+    }
+
+    static final Appender APPEND_ATOMIC = new Appender() {
+        @Override
+        public Long call(BatchAccumulator<String> acc, int epoch, List<String> records) {
+            return acc.appendAtomic(epoch, records);
+        }
+    };
+
+    static final Appender APPEND = new Appender() {
+        @Override
+        public Long call(BatchAccumulator<String> acc, int epoch, List<String> records) {
+            return acc.append(epoch, records);
+        }
+    };
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
index f860df7..e4611f1 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -69,8 +68,8 @@ class BatchBuilderTest {
 
         records.forEach(record -> builder.appendRecord(record, null));
         MemoryRecords builtRecordSet = builder.build();
-        assertFalse(builder.hasRoomFor(1));
-        assertThrows(IllegalArgumentException.class, () -> builder.appendRecord("a", null));
+        assertTrue(builder.bytesNeeded(Arrays.asList("a"), null).isPresent());
+        assertThrows(IllegalStateException.class, () -> builder.appendRecord("a", null));
 
         List<MutableRecordBatch> builtBatches = Utils.toList(builtRecordSet.batchIterator());
         assertEquals(1, builtBatches.size());
@@ -112,9 +111,8 @@ class BatchBuilderTest {
         );
 
         String record = "i am a record";
-        int recordSize = serde.recordSize(record);
 
-        while (builder.hasRoomFor(recordSize)) {
+        while (!builder.bytesNeeded(Arrays.asList(record), null).isPresent()) {
             builder.appendRecord(record, null);
         }
 
@@ -125,5 +123,4 @@ class BatchBuilderTest {
         assertTrue(sizeInBytes <= batchSize, "Built batch size "
             + sizeInBytes + " is larger than max batch size " + batchSize);
     }
-
 }


[kafka] 02/02: KAFKA-12332; Error partitions from topics with invalid IDs in LISR requests (#10143)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0b4aa2b2dfdf11a7b6bcc885e8161b6f2c0ef5e1
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Fri Feb 19 14:08:00 2021 -0500

    KAFKA-12332; Error partitions from topics with invalid IDs in LISR requests (#10143)
    
    Changes how invalid IDs are handled in LeaderAndIsr requests. The ID check now occurs before leader epoch. If the ID exists and is invalid, the partition is ignored and a new `INCONSISTENT_TOPIC_ID` error is returned in the response.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../errors/InconsistentTopicIdException.java       | 27 ++++++++++
 .../org/apache/kafka/common/protocol/Errors.java   |  4 +-
 core/src/main/scala/kafka/cluster/Partition.scala  | 40 +++++++++++++-
 .../main/scala/kafka/server/ReplicaManager.scala   | 31 +++--------
 .../unit/kafka/server/ReplicaManagerTest.scala     | 62 ++++++++++++++++++++--
 5 files changed, 134 insertions(+), 30 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InconsistentTopicIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/InconsistentTopicIdException.java
new file mode 100644
index 0000000..1dfe468
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InconsistentTopicIdException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InconsistentTopicIdException extends InvalidMetadataException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InconsistentTopicIdException(String message) {
+        super(message);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 03c1248..34c4206 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
 import org.apache.kafka.common.errors.InconsistentVoterSetException;
 import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
@@ -354,7 +355,8 @@ public enum Errors {
         "Requested position is not greater than or equal to zero, and less than the size of the snapshot.",
         PositionOutOfRangeException::new),
     UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new),
-    DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new);
+    DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new),
+    INCONSISTENT_TOPIC_ID(102, "The log's topic ID did not match the topic ID in the request", InconsistentTopicIdException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 6f6cb88..cfd029b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{IsolationLevel, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
@@ -428,6 +428,44 @@ class Partition(val topicPartition: TopicPartition,
       this.log = Some(log)
   }
 
+  /**
+   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
+   *
+   * @param requestTopicId the topic ID from the request
+   * @return true if the request topic id is consistent, false otherwise
+   */
+  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
+    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
+    // The topic ID was not inconsistent, so return true.
+    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
+    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
+      true
+    else {
+      log match {
+        case None => true
+        case Some(log) => {
+          // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+          // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+          // Topic ID is consistent since we are just setting it here.
+          if (log.topicId == Uuid.ZERO_UUID) {
+            log.partitionMetadataFile.write(requestTopicId)
+            log.topicId = requestTopicId
+            true
+          } else if (log.topicId != requestTopicId) {
+            stateChangeLogger.error(s"Topic Id in memory: ${log.topicId} does not" +
+              s" match the topic Id for partition $topicPartition provided in the request: " +
+              s"$requestTopicId.")
+            false
+          } else {
+            // topic ID in log exists and matches request topic ID
+            true
+          }
+        }
+      }
+    }
+  }
+
   // remoteReplicas will be called in the hot path, and must be inexpensive
   def remoteReplicas: Iterable[Replica] =
     remoteReplicasMap.values
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ba50c86..820af7b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -37,7 +37,7 @@ import kafka.server.metadata.ConfigRepository
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -1364,11 +1364,15 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
+              val requestTopicId = topicIds.get(topicPartition.topic)
+
+              if (!partition.checkOrSetTopicId(requestTopicId)) {
+                responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
+              } else if (requestLeaderEpoch > currentLeaderEpoch) {
                 // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
                 // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
                 if (partitionState.replicas.contains(localBrokerId))
@@ -1424,27 +1428,8 @@ class ReplicaManager(val config: KafkaConfig,
            * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
            * we need to map this topic-partition to OfflinePartition instead.
            */
-            val local = localLog(topicPartition)
-            if (local.isEmpty)
+            if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
-            else {
-              val id = topicIds.get(topicPartition.topic())
-              // Ensure we have not received a request from an older protocol
-              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
-                val log = local.get
-                // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
-                // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
-                if (log.topicId.equals(Uuid.ZERO_UUID)) {
-                  log.partitionMetadataFile.write(id)
-                  log.topicId = id
-                  // Warn if the topic ID in the request does not match the log.
-                } else if (!log.topicId.equals(id)) {
-                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
-                    s" match the topic Id provided in the request: " +
-                    s"${id.toString}.")
-                }
-              }
-            }
           }
 
           // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c31bf8e..9b289e5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2229,6 +2229,7 @@ class ReplicaManagerTest {
         .createLogIfNotExists(isNew = false, isFutureReplica = false,
           new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
       val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+      val topicNames = topicIds.asScala.map(_.swap).asJava
 
       def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
         Seq(new LeaderAndIsrPartitionState()
@@ -2244,7 +2245,8 @@ class ReplicaManagerTest {
         topicIds,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
 
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+      val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+      assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val id = topicIds.get(topicPartition.topic())
       val log = replicaManager.localLog(topicPartition).get
@@ -2258,6 +2260,51 @@ class ReplicaManagerTest {
   }
 
   @Test
+  def testInvalidIdReturnsError() = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+      val topicNames = topicIds.asScala.map(_.swap).asJava
+
+      val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+      val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava
+
+      def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest =
+        new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(epoch)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(true)).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ())
+      assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
+
+      val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, topicIds), (_, _) => ())
+      assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition))
+
+      // Send request with invalid ID.
+      val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ())
+      assertEquals(Errors.INCONSISTENT_TOPIC_ID, response3.partitionErrors(invalidTopicNames).get(topicPartition))
+
+      val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(2, invalidTopicIds), (_, _) => ())
+      assertEquals(Errors.INCONSISTENT_TOPIC_ID, response4.partitionErrors(invalidTopicNames).get(topicPartition))
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
   def testPartitionMetadataFileNotCreated() = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
     try {
@@ -2268,6 +2315,7 @@ class ReplicaManagerTest {
         .createLogIfNotExists(isNew = false, isFutureReplica = false,
           new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
       val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> Uuid.randomUuid()).asJava
+      val topicNames = topicIds.asScala.map(_.swap).asJava
 
       def leaderAndIsrRequest(epoch: Int, name: String, version: Short): LeaderAndIsrRequest = LeaderAndIsrRequest.parse(
         new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch,
@@ -2285,28 +2333,32 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build().serialize(), version)
 
       // There is no file if the topic does not have an associated topic ID.
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+      val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val log = replicaManager.localLog(topicPartition).get
       assertFalse(log.partitionMetadataFile.exists())
+      assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
 
       // There is no file if the topic has the default UUID.
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+      val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val log2 = replicaManager.localLog(topicPartition).get
       assertFalse(log2.partitionMetadataFile.exists())
+      assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition))
 
       // There is no file if the request an older version
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
+      val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
       val log3 = replicaManager.localLog(topicPartitionFoo).get
       assertFalse(log3.partitionMetadataFile.exists())
+      assertEquals(Errors.NONE, response3.partitionErrors(topicNames).get(topicPartitionFoo))
 
       // There is no file if the request is an older version
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 4), (_, _) => ())
+      val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, "foo", 4), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
       val log4 = replicaManager.localLog(topicPartitionFoo).get
       assertFalse(log4.partitionMetadataFile.exists())
+      assertEquals(Errors.NONE, response4.partitionErrors(topicNames).get(topicPartitionFoo))
     } finally replicaManager.shutdown(checkpointHW = false)
   }