You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/22 05:58:58 UTC

[kafka] 01/02: MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (#6775)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d05b47112ad81bf37cff078f403544cfd65a5221
Author: David Arthur <mu...@gmail.com>
AuthorDate: Tue May 21 18:50:21 2019 -0400

    MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (#6775)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../internals/OffsetsForLeaderEpochClient.java         |  2 +-
 .../org/apache/kafka/common/requests/FetchRequest.java |  2 +-
 .../apache/kafka/common/requests/FetchResponse.java    | 14 ++++++++------
 .../common/requests/OffsetsForLeaderEpochRequest.java  | 13 +++++++++++--
 .../kafka/clients/consumer/internals/FetcherTest.java  | 14 +++++++-------
 .../kafka/common/requests/RequestResponseTest.java     | 18 +++++++++---------
 .../main/scala/kafka/server/ReplicaFetcherThread.scala |  2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala          |  2 +-
 .../server/OffsetsForLeaderEpochRequestTest.scala      |  4 ++--
 .../scala/unit/kafka/server/RequestQuotaTest.scala     |  2 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala      |  2 +-
 11 files changed, 43 insertions(+), 32 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index 9ffedd1..d7b02a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -53,7 +53,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
             fetchEpoch -> partitionData.put(topicPartition,
                 new OffsetsForLeaderEpochRequest.PartitionData(fetchPosition.currentLeader.epoch, fetchEpoch))));
 
-        return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData);
+        return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 485b102..da09df3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -459,7 +459,7 @@ public class FetchRequest extends AbstractRequest {
         for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) {
             FetchResponse.PartitionData<MemoryRecords> partitionResponse = new FetchResponse.PartitionData<>(error,
                 FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY);
+                FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
         return new FetchResponse<>(error, responseData, throttleTimeMs, metadata.sessionId());
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index e857b5b..942b0d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.function.Predicate;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
@@ -223,7 +224,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
-    public static final int UNSPECIFIED_PREFERRED_REPLICA = -1;
+    public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
     private final int throttleTimeMs;
     private final Errors error;
@@ -277,14 +278,14 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                              long highWatermark,
                              long lastStableOffset,
                              long logStartOffset,
-                             Integer preferredReadReplica,
+                             Optional<Integer> preferredReadReplica,
                              List<AbortedTransaction> abortedTransactions,
                              T records) {
             this.error = error;
             this.highWatermark = highWatermark;
             this.lastStableOffset = lastStableOffset;
             this.logStartOffset = logStartOffset;
-            this.preferredReadReplica = Optional.ofNullable(preferredReadReplica);
+            this.preferredReadReplica = preferredReadReplica;
             this.abortedTransactions = abortedTransactions;
             this.records = records;
         }
@@ -379,7 +380,9 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                 long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
                 long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
                 long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                int preferredReadReplica = partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, UNSPECIFIED_PREFERRED_REPLICA);
+                Optional<Integer> preferredReadReplica = Optional.of(
+                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
+                ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
 
                 BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
                 if (!(baseRecords instanceof MemoryRecords))
@@ -401,8 +404,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                 }
 
                 PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica == UNSPECIFIED_PREFERRED_REPLICA ? null : preferredReadReplica,
-                        abortedTransactions, records);
+                        logStartOffset, preferredReadReplica, abortedTransactions, records);
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 5052b0e..6599a70 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -101,10 +101,19 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
         private final Map<TopicPartition, PartitionData> epochsByPartition;
         private final int replicaId;
 
-        public Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition) {
+        Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
             super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
             this.epochsByPartition = epochsByPartition;
-            this.replicaId = CONSUMER_REPLICA_ID;
+            this.replicaId = replicaId;
+        }
+
+        public static Builder forConsumer(short version, Map<TopicPartition, PartitionData> epochsByPartition) {
+            return new Builder(version, epochsByPartition, CONSUMER_REPLICA_ID);
+        }
+
+        public static Builder forFollower(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
+            return new Builder(version, epochsByPartition, replicaId);
+
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 2f40ffc..0e2662a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1173,7 +1173,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = new HashMap<>();
         partitions.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records));
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(time.timer(0));
 
@@ -1184,7 +1184,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         partitions = new HashMap<>();
         partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID));
         consumerClient.poll(time.timer(0));
         assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
@@ -3341,7 +3341,7 @@ public class FetcherTest {
 
         // Set preferred read replica to node=1
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -3358,7 +3358,7 @@ public class FetcherTest {
 
         // Set preferred read replica to node=2, which isn't in our metadata, should revert to leader
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 2));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(2)));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
         fetchedRecords();
@@ -3379,7 +3379,7 @@ public class FetcherTest {
         assertFalse(fetcher.hasCompletedFetches());
 
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -3393,7 +3393,7 @@ public class FetcherTest {
         assertFalse(fetcher.hasCompletedFetches());
 
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, null));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.empty()));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -3449,7 +3449,7 @@ public class FetcherTest {
     }
 
     private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
-                                                           long lastStableOffset, int throttleTime, Integer preferredReplicaId) {
+                                                           long lastStableOffset, int throttleTime, Optional<Integer> preferredReplicaId) {
         Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp,
                 new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 0L,
                         preferredReplicaId, null, records));
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 1cef864..d7e9223 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -559,7 +559,7 @@ public class RequestResponseTest {
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(
                 Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                0L, null, null, records));
+                0L, Optional.empty(), null, records));
 
         FetchResponse<MemoryRecords> v0Response = new FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID);
         FetchResponse<MemoryRecords> v1Response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
@@ -583,11 +583,11 @@ public class RequestResponseTest {
                 new FetchResponse.AbortedTransaction(15, 50)
         );
         responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData<>(Errors.NONE, 100000,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, abortedTransactions, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), abortedTransactions, records));
         responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData<>(Errors.NONE, 900000,
-                5, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
+                5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records));
         responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData<>(Errors.NONE, 70000,
-                6, FetchResponse.INVALID_LOG_START_OFFSET, null, Collections.emptyList(), records));
+                6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), Collections.emptyList(), records));
 
         FetchResponse<MemoryRecords> response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
         FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -751,11 +751,11 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records));
+            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records));
         List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
             new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY));
+            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
         return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
     }
 
@@ -763,12 +763,12 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
-                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records));
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
                 new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
-                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
 
         return new FetchResponse<>(Errors.NONE, responseData, 25, INVALID_SESSION_ID);
     }
@@ -1260,7 +1260,7 @@ public class RequestResponseTest {
         epochs.put(new TopicPartition("topic2", 2),
                 new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
 
-        return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
+        return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
     }
 
     private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8e92c2b..ab5be6e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -303,7 +303,7 @@ class ReplicaFetcherThread(name: String,
       return Map.empty
     }
 
-    val epochRequest = new OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, partitions.asJava)
+    val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(offsetForLeaderEpochRequestVersion, partitions.asJava, brokerConfig.brokerId)
     debug(s"Sending offset for leader epoch request $epochRequest")
 
     try {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index b465773..a577b2e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -295,7 +295,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = {
     val epochs = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(27), 7))
-    new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs.asJava).build()
+    OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs.asJava).build()
   }
 
   private def createOffsetFetchRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 2cdd2e8..4d1416c 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -35,7 +35,7 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
     val partition = new TopicPartition(topic, 0)
 
     val epochs = Map(partition -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty[Integer], 0)).asJava
-    val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs).build()
+    val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs).build()
 
     // Unknown topic
     val randomBrokerId = servers.head.config.brokerId
@@ -61,7 +61,7 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
     def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
       val epochs = Map(topicPartition -> new OffsetsForLeaderEpochRequest.PartitionData(
         currentLeaderEpoch, 0)).asJava
-      val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs)
+      val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs)
         .build()
       assertResponseError(error, brokerId, request)
     }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ddcee12..047188f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -362,7 +362,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new InitProducerIdRequest.Builder(requestData)
 
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
-          new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
+          OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
             Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(15), 0)).asJava)
 
         case ApiKeys.ADD_PARTITIONS_TO_TXN =>
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index cb8996c..8a6dcba 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -279,7 +279,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
       val partitionData = partitions.mapValues(
         new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), _))
-      val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
+      val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
         partitionData.asJava)
       val response = sender.sendRequest(request)
       response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala