You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/22 05:58:57 UTC

[kafka] branch 2.3 updated (9dfdc58 -> 42aacce)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 9dfdc58  KAFKA-8265: Fix config name to match KIP-458. (#6755)
     new d05b471  MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (#6775)
     new 42aacce  HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392 (#6780)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internals/OffsetsForLeaderEpochClient.java     |   2 +-
 .../apache/kafka/common/requests/FetchRequest.java |   2 +-
 .../kafka/common/requests/FetchResponse.java       |  14 +-
 .../requests/OffsetsForLeaderEpochRequest.java     |  15 +-
 .../resources/common/message/HeartbeatRequest.json |   3 +-
 .../resources/common/message/JoinGroupRequest.json |   3 +-
 .../common/message/OffsetCommitRequest.json        |   3 +-
 .../resources/common/message/SyncGroupRequest.json |   3 +-
 .../clients/consumer/internals/FetcherTest.java    |  14 +-
 .../apache/kafka/common/message/MessageTest.java   | 196 +++++++++++++++------
 .../kafka/common/requests/RequestResponseTest.java |  25 +--
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../server/OffsetsForLeaderEpochRequestTest.scala  |   4 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala  |   2 +-
 .../apache/kafka/message/MessageDataGenerator.java |   6 +-
 17 files changed, 207 insertions(+), 91 deletions(-)


[kafka] 02/02: HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392 (#6780)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 42aaccec8b86fc26dc6da5ea3f1c3c7fd7a496a5
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue May 21 22:51:56 2019 -0700

    HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392 (#6780)
    
    KIP-345 and KIP-392 introduced a couple breaking changes for old versions of bumped protocols. This patch fixes them.
    
    Reviewers: Colin Patrick McCabe <cm...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Boyang Chen <bc...@outlook.com>, Guozhang Wang <wa...@gmail.com>
---
 .../requests/OffsetsForLeaderEpochRequest.java     |   2 +-
 .../resources/common/message/HeartbeatRequest.json |   3 +-
 .../resources/common/message/JoinGroupRequest.json |   3 +-
 .../common/message/OffsetCommitRequest.json        |   3 +-
 .../resources/common/message/SyncGroupRequest.json |   3 +-
 .../apache/kafka/common/message/MessageTest.java   | 196 +++++++++++++++------
 .../kafka/common/requests/RequestResponseTest.java |   9 +-
 .../apache/kafka/message/MessageDataGenerator.java |   6 +-
 8 files changed, 165 insertions(+), 60 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 6599a70..d3df6cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -166,7 +166,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct requestStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
-        requestStruct.set(REPLICA_ID, replicaId);
+        requestStruct.setIfExists(REPLICA_ID, replicaId);
 
         Map<String, Map<Integer, PartitionData>> topicsToPartitionEpochs = CollectionUtils.groupPartitionDataByTopic(epochsByPartition);
 
diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json b/clients/src/main/resources/common/message/HeartbeatRequest.json
index 148e661..aa7f337 100644
--- a/clients/src/main/resources/common/message/HeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/HeartbeatRequest.json
@@ -27,7 +27,8 @@
       "about": "The generation of the group." },
     { "name": "MemberId", "type": "string", "versions": "0+",
       "about": "The member ID." },
-    { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
+    { "name": "GroupInstanceId", "type": "string", "versions": "3+",
+      "nullableVersions": "3+", "default": "null",
       "about": "The unique identifier of the consumer instance provided by end user." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 6db24da..8e58b87 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -34,7 +34,8 @@
       "about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
     { "name": "MemberId", "type": "string", "versions": "0+",
       "about": "The member id assigned by the group coordinator." },
-    { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+",
+    { "name": "GroupInstanceId", "type": "string", "versions": "5+", 
+      "nullableVersions": "5+", "default": "null",
       "about": "The unique identifier of the consumer instance provided by end user." },
     { "name": "ProtocolType", "type": "string", "versions": "0+",
       "about": "The unique name the for class of protocols implemented by the group we want to join." },
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index 0ad7565..adda079 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -35,7 +35,8 @@
       "about": "The generation of the group." },
     { "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true,
       "about": "The member ID assigned by the group coordinator." },
-    { "name": "GroupInstanceId", "type": "string", "versions": "7+", "ignorable": true, "nullableVersions": "7+",
+    { "name": "GroupInstanceId", "type": "string", "versions": "7+",
+      "nullableVersions": "7+", "default": "null",
       "about": "The unique identifier of the consumer instance provided by end user." },
     { "name": "RetentionTimeMs", "type": "int64", "versions": "2-4", "default": "-1", "ignorable": true,
       "about": "The time period in ms to retain the offset." },
diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json b/clients/src/main/resources/common/message/SyncGroupRequest.json
index 282cb2a..3ace70d 100644
--- a/clients/src/main/resources/common/message/SyncGroupRequest.json
+++ b/clients/src/main/resources/common/message/SyncGroupRequest.json
@@ -27,7 +27,8 @@
       "about": "The generation of the group." },
     { "name": "MemberId", "type": "string", "versions": "0+",
       "about": "The member ID assigned by the group." },
-    { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+",
+    { "name": "GroupInstanceId", "type": "string", "versions": "3+", 
+      "nullableVersions": "3+", "default": "null",
       "about": "The unique identifier of the consumer instance provided by end user." },
     { "name": "Assignments", "type": "[]SyncGroupRequestAssignment", "versions": "0+",
       "about": "Each assignment.", "fields": [
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 1780a71..547dbc4 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.common.message;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Message;
@@ -25,22 +27,20 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.BoundField;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
-import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -49,50 +49,127 @@ public final class MessageTest {
     @Rule
     final public Timeout globalTimeout = Timeout.millis(120000);
 
-    /**
-     * Test serializing and deserializing some messages.
-     */
     @Test
-    public void testRoundTrips() throws Exception {
-        testMessageRoundTrips(new MetadataRequestData().setTopics(
-            Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"),
-                new MetadataRequestData.MetadataRequestTopic().setName("bar")
-            )), (short) 6);
-        testMessageRoundTrips(new AddOffsetsToTxnRequestData().
-            setTransactionalId("foobar").
-            setProducerId(0xbadcafebadcafeL).
-            setProducerEpoch((short) 123).
-            setGroupId("baaz"), (short) 1);
-        testMessageRoundTrips(new AddOffsetsToTxnResponseData().
-            setThrottleTimeMs(42).
-            setErrorCode((short) 0), (short) 0);
-        testMessageRoundTrips(new AddPartitionsToTxnRequestData().
-            setTransactionalId("blah").
-            setProducerId(0xbadcafebadcafeL).
-            setProducerEpoch((short) 30000).
-            setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
-                new AddPartitionsToTxnTopic().
-                    setName("Topic").
-                    setPartitions(Collections.singletonList(1))).iterator())));
-        testMessageRoundTrips(new CreateTopicsRequestData().
-            setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection()));
-        testMessageRoundTrips(new DescribeAclsRequestData().
-            setResourceType((byte) 42).
-            setResourceNameFilter(null).
-            setResourcePatternType((byte) 3).
-            setPrincipalFilter("abc").
-            setHostFilter(null).
-            setOperation((byte) 0).
-            setPermissionType((byte) 0), (short) 0);
-        testMessageRoundTrips(new MetadataRequestData().
-            setTopics(null).
-            setAllowAutoTopicCreation(false).
-            setIncludeClusterAuthorizedOperations(false).
-            setIncludeTopicAuthorizedOperations(false));
+    public void testAddOffsetsToTxnVersions() throws Exception {
+        testAllMessageRoundTrips(new AddOffsetsToTxnRequestData().
+                setTransactionalId("foobar").
+                setProducerId(0xbadcafebadcafeL).
+                setProducerEpoch((short) 123).
+                setGroupId("baaz"));
+        testAllMessageRoundTrips(new AddOffsetsToTxnResponseData().
+                setThrottleTimeMs(42).
+                setErrorCode((short) 0));
+    }
+
+    @Test
+    public void testAddPartitionsToTxnVersions() throws Exception {
+        testAllMessageRoundTrips(new AddPartitionsToTxnRequestData().
+                setTransactionalId("blah").
+                setProducerId(0xbadcafebadcafeL).
+                setProducerEpoch((short) 30000).
+                setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
+                        new AddPartitionsToTxnTopic().
+                                setName("Topic").
+                                setPartitions(Collections.singletonList(1))).iterator())));
+    }
+
+    @Test
+    public void testCreateTopicsVersions() throws Exception {
+        testAllMessageRoundTrips(new CreateTopicsRequestData().
+                setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection()));
     }
 
-    private void testMessageRoundTrips(Message message) throws Exception {
-        testMessageRoundTrips(message, message.highestSupportedVersion());
+    @Test
+    public void testDescribeAclsRequest() throws Exception {
+        testAllMessageRoundTrips(new DescribeAclsRequestData().
+                setResourceType((byte) 42).
+                setResourceNameFilter(null).
+                setResourcePatternType((byte) 3).
+                setPrincipalFilter("abc").
+                setHostFilter(null).
+                setOperation((byte) 0).
+                setPermissionType((byte) 0));
+    }
+
+    @Test
+    public void testMetadataVersions() throws Exception {
+        testAllMessageRoundTrips(new MetadataRequestData().setTopics(
+                Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"),
+                        new MetadataRequestData.MetadataRequestTopic().setName("bar")
+                )));
+        testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+                setTopics(null).
+                setAllowAutoTopicCreation(true).
+                setIncludeClusterAuthorizedOperations(false).
+                setIncludeTopicAuthorizedOperations(false), (short) 1);
+        testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+                setTopics(null).
+                setAllowAutoTopicCreation(false).
+                setIncludeClusterAuthorizedOperations(false).
+                setIncludeTopicAuthorizedOperations(false), (short) 4);
+    }
+
+    @Test
+    public void testHeartbeatVersions() throws Exception {
+        Supplier<HeartbeatRequestData> newRequest = () -> new HeartbeatRequestData()
+                .setGroupId("groupId")
+                .setMemberId("memberId")
+                .setGenerationId(15);
+        testAllMessageRoundTrips(newRequest.get());
+        testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
+        testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 3);
+    }
+
+    @Test
+    public void testJoinGroupVersions() throws Exception {
+        Supplier<JoinGroupRequestData> newRequest = () -> new JoinGroupRequestData()
+                .setGroupId("groupId")
+                .setMemberId("memberId")
+                .setProtocolType("consumer")
+                .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection())
+                .setSessionTimeoutMs(10000);
+        testAllMessageRoundTrips(newRequest.get());
+        testAllMessageRoundTripsFromVersion(newRequest.get().setRebalanceTimeoutMs(20000), (short) 1);
+        testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
+        testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 5);
+    }
+
+    @Test
+    public void testSyncGroupDefaultGroupInstanceId() throws Exception {
+        Supplier<SyncGroupRequestData> request = () -> new SyncGroupRequestData()
+                .setGroupId("groupId")
+                .setMemberId("memberId")
+                .setGenerationId(15)
+                .setAssignments(new ArrayList<>());
+        testAllMessageRoundTrips(request.get());
+        testAllMessageRoundTrips(request.get().setGroupInstanceId(null));
+        testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 3);
+    }
+
+    @Test
+    public void testOffsetCommitDefaultGroupInstanceId() throws Exception {
+        testAllMessageRoundTrips(new OffsetCommitRequestData()
+                .setTopics(new ArrayList<>())
+                .setGroupId("groupId"));
+
+        Supplier<OffsetCommitRequestData> request = () -> new OffsetCommitRequestData()
+                .setGroupId("groupId")
+                .setMemberId("memberId")
+                .setTopics(new ArrayList<>())
+                .setGenerationId(15);
+        testAllMessageRoundTripsFromVersion(request.get(), (short) 1);
+        testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId(null), (short) 1);
+        testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 7);
+    }
+
+    private void testAllMessageRoundTrips(Message message) throws Exception {
+        testAllMessageRoundTripsFromVersion(message, message.lowestSupportedVersion());
+    }
+
+    private void testAllMessageRoundTripsFromVersion(Message message, short fromVersion) throws Exception {
+        for (short version = fromVersion; version < message.highestSupportedVersion(); version++) {
+            testMessageRoundTrips(message, version);
+        }
     }
 
     private void testMessageRoundTrips(Message message, short version) throws Exception {
@@ -294,6 +371,25 @@ public final class MessageTest {
                 new FetchRequestData.ForgottenTopic().setName("foo"))));
     }
 
+    @Test
+    public void testNonIgnorableFieldWithDefaultNull() throws Exception {
+        // Test non-ignorable string field `groupInstanceId` with default null
+        verifySizeRaisesUve((short) 0, "groupInstanceId", new HeartbeatRequestData()
+                .setGroupId("groupId")
+                .setGenerationId(15)
+                .setMemberId("memberId")
+                .setGroupInstanceId("instanceId"));
+        verifySizeSucceeds((short) 0, new HeartbeatRequestData()
+                .setGroupId("groupId")
+                .setGenerationId(15)
+                .setMemberId("memberId")
+                .setGroupInstanceId(null));
+        verifySizeSucceeds((short) 0, new HeartbeatRequestData()
+                .setGroupId("groupId")
+                .setGenerationId(15)
+                .setMemberId("memberId"));
+    }
+
     private void verifySizeRaisesUve(short version, String problemFieldName,
                                      Message message) throws Exception {
         try {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d7e9223..e8f349f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -290,9 +290,10 @@ public class RequestResponseTest {
         checkRequest(createListOffsetRequest(0), true);
         checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true);
         checkResponse(createListOffsetResponse(0), 0, true);
-        checkRequest(createLeaderEpochRequest(), true);
+        checkRequest(createLeaderEpochRequest(0), true);
+        checkRequest(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), true);
         checkResponse(createLeaderEpochResponse(), 0, true);
-        checkErrorResponse(createLeaderEpochRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), new UnknownServerException(), true);
         checkRequest(createAddPartitionsToTxnRequest(), true);
         checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true);
         checkResponse(createAddPartitionsToTxnResponse(), 0, true);
@@ -1250,7 +1251,7 @@ public class RequestResponseTest {
         return new InitProducerIdResponse(responseData);
     }
 
-    private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
+    private OffsetsForLeaderEpochRequest createLeaderEpochRequest(int version) {
         Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();
 
         epochs.put(new TopicPartition("topic1", 0),
@@ -1260,7 +1261,7 @@ public class RequestResponseTest {
         epochs.put(new TopicPartition("topic2", 2),
                 new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
 
-        return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
+        return OffsetsForLeaderEpochRequest.Builder.forConsumer((short) version, epochs).build();
     }
 
     private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index d6cd5f3..ad00137 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -928,7 +928,11 @@ public final class MessageDataGenerator {
         } else if (field.type().isBytes()) {
             buffer.printf("if (%s.length != 0) {%n", field.camelCaseName());
         } else if (field.type().isString()) {
-            buffer.printf("if (%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field));
+            if (fieldDefault(field).equals("null")) {
+                buffer.printf("if (%s != null) {%n", field.camelCaseName());
+            } else {
+                buffer.printf("if (!%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field));
+            }
         } else if (field.type() instanceof FieldType.BoolFieldType) {
             buffer.printf("if (%s%s) {%n",
                 fieldDefault(field).equals("true") ? "!" : "",


[kafka] 01/02: MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (#6775)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d05b47112ad81bf37cff078f403544cfd65a5221
Author: David Arthur <mu...@gmail.com>
AuthorDate: Tue May 21 18:50:21 2019 -0400

    MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (#6775)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../internals/OffsetsForLeaderEpochClient.java         |  2 +-
 .../org/apache/kafka/common/requests/FetchRequest.java |  2 +-
 .../apache/kafka/common/requests/FetchResponse.java    | 14 ++++++++------
 .../common/requests/OffsetsForLeaderEpochRequest.java  | 13 +++++++++++--
 .../kafka/clients/consumer/internals/FetcherTest.java  | 14 +++++++-------
 .../kafka/common/requests/RequestResponseTest.java     | 18 +++++++++---------
 .../main/scala/kafka/server/ReplicaFetcherThread.scala |  2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala          |  2 +-
 .../server/OffsetsForLeaderEpochRequestTest.scala      |  4 ++--
 .../scala/unit/kafka/server/RequestQuotaTest.scala     |  2 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala      |  2 +-
 11 files changed, 43 insertions(+), 32 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index 9ffedd1..d7b02a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -53,7 +53,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
             fetchEpoch -> partitionData.put(topicPartition,
                 new OffsetsForLeaderEpochRequest.PartitionData(fetchPosition.currentLeader.epoch, fetchEpoch))));
 
-        return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData);
+        return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 485b102..da09df3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -459,7 +459,7 @@ public class FetchRequest extends AbstractRequest {
         for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) {
             FetchResponse.PartitionData<MemoryRecords> partitionResponse = new FetchResponse.PartitionData<>(error,
                 FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY);
+                FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
         return new FetchResponse<>(error, responseData, throttleTimeMs, metadata.sessionId());
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index e857b5b..942b0d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.function.Predicate;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
@@ -223,7 +224,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
-    public static final int UNSPECIFIED_PREFERRED_REPLICA = -1;
+    public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
     private final int throttleTimeMs;
     private final Errors error;
@@ -277,14 +278,14 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                              long highWatermark,
                              long lastStableOffset,
                              long logStartOffset,
-                             Integer preferredReadReplica,
+                             Optional<Integer> preferredReadReplica,
                              List<AbortedTransaction> abortedTransactions,
                              T records) {
             this.error = error;
             this.highWatermark = highWatermark;
             this.lastStableOffset = lastStableOffset;
             this.logStartOffset = logStartOffset;
-            this.preferredReadReplica = Optional.ofNullable(preferredReadReplica);
+            this.preferredReadReplica = preferredReadReplica;
             this.abortedTransactions = abortedTransactions;
             this.records = records;
         }
@@ -379,7 +380,9 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                 long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
                 long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
                 long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                int preferredReadReplica = partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, UNSPECIFIED_PREFERRED_REPLICA);
+                Optional<Integer> preferredReadReplica = Optional.of(
+                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
+                ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
 
                 BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
                 if (!(baseRecords instanceof MemoryRecords))
@@ -401,8 +404,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                 }
 
                 PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica == UNSPECIFIED_PREFERRED_REPLICA ? null : preferredReadReplica,
-                        abortedTransactions, records);
+                        logStartOffset, preferredReadReplica, abortedTransactions, records);
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 5052b0e..6599a70 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -101,10 +101,19 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
         private final Map<TopicPartition, PartitionData> epochsByPartition;
         private final int replicaId;
 
-        public Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition) {
+        Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
             super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
             this.epochsByPartition = epochsByPartition;
-            this.replicaId = CONSUMER_REPLICA_ID;
+            this.replicaId = replicaId;
+        }
+
+        public static Builder forConsumer(short version, Map<TopicPartition, PartitionData> epochsByPartition) {
+            return new Builder(version, epochsByPartition, CONSUMER_REPLICA_ID);
+        }
+
+        public static Builder forFollower(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
+            return new Builder(version, epochsByPartition, replicaId);
+
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 2f40ffc..0e2662a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1173,7 +1173,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = new HashMap<>();
         partitions.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records));
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(time.timer(0));
 
@@ -1184,7 +1184,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         partitions = new HashMap<>();
         partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID));
         consumerClient.poll(time.timer(0));
         assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
@@ -3341,7 +3341,7 @@ public class FetcherTest {
 
         // Set preferred read replica to node=1
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -3358,7 +3358,7 @@ public class FetcherTest {
 
         // Set preferred read replica to node=2, which isn't in our metadata, should revert to leader
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 2));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(2)));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
         fetchedRecords();
@@ -3379,7 +3379,7 @@ public class FetcherTest {
         assertFalse(fetcher.hasCompletedFetches());
 
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -3393,7 +3393,7 @@ public class FetcherTest {
         assertFalse(fetcher.hasCompletedFetches());
 
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, null));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.empty()));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -3449,7 +3449,7 @@ public class FetcherTest {
     }
 
     private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
-                                                           long lastStableOffset, int throttleTime, Integer preferredReplicaId) {
+                                                           long lastStableOffset, int throttleTime, Optional<Integer> preferredReplicaId) {
         Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp,
                 new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 0L,
                         preferredReplicaId, null, records));
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 1cef864..d7e9223 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -559,7 +559,7 @@ public class RequestResponseTest {
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(
                 Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                0L, null, null, records));
+                0L, Optional.empty(), null, records));
 
         FetchResponse<MemoryRecords> v0Response = new FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID);
         FetchResponse<MemoryRecords> v1Response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
@@ -583,11 +583,11 @@ public class RequestResponseTest {
                 new FetchResponse.AbortedTransaction(15, 50)
         );
         responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData<>(Errors.NONE, 100000,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, abortedTransactions, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), abortedTransactions, records));
         responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData<>(Errors.NONE, 900000,
-                5, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
+                5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records));
         responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData<>(Errors.NONE, 70000,
-                6, FetchResponse.INVALID_LOG_START_OFFSET, null, Collections.emptyList(), records));
+                6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), Collections.emptyList(), records));
 
         FetchResponse<MemoryRecords> response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
         FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -751,11 +751,11 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records));
+            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records));
         List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
             new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY));
+            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
         return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
     }
 
@@ -763,12 +763,12 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
-                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records));
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
                 new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
-                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
 
         return new FetchResponse<>(Errors.NONE, responseData, 25, INVALID_SESSION_ID);
     }
@@ -1260,7 +1260,7 @@ public class RequestResponseTest {
         epochs.put(new TopicPartition("topic2", 2),
                 new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
 
-        return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
+        return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
     }
 
     private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8e92c2b..ab5be6e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -303,7 +303,7 @@ class ReplicaFetcherThread(name: String,
       return Map.empty
     }
 
-    val epochRequest = new OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, partitions.asJava)
+    val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(offsetForLeaderEpochRequestVersion, partitions.asJava, brokerConfig.brokerId)
     debug(s"Sending offset for leader epoch request $epochRequest")
 
     try {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index b465773..a577b2e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -295,7 +295,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = {
     val epochs = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(27), 7))
-    new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs.asJava).build()
+    OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs.asJava).build()
   }
 
   private def createOffsetFetchRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 2cdd2e8..4d1416c 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -35,7 +35,7 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
     val partition = new TopicPartition(topic, 0)
 
     val epochs = Map(partition -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty[Integer], 0)).asJava
-    val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs).build()
+    val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs).build()
 
     // Unknown topic
     val randomBrokerId = servers.head.config.brokerId
@@ -61,7 +61,7 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
     def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
       val epochs = Map(topicPartition -> new OffsetsForLeaderEpochRequest.PartitionData(
         currentLeaderEpoch, 0)).asJava
-      val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs)
+      val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs)
         .build()
       assertResponseError(error, brokerId, request)
     }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ddcee12..047188f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -362,7 +362,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new InitProducerIdRequest.Builder(requestData)
 
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
-          new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
+          OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
             Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(15), 0)).asJava)
 
         case ApiKeys.ADD_PARTITIONS_TO_TXN =>
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index cb8996c..8a6dcba 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -279,7 +279,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
       val partitionData = partitions.mapValues(
         new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), _))
-      val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
+      val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
         partitionData.asJava)
       val response = sender.sendRequest(request)
       response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala