You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/21 22:50:37 UTC
[kafka] branch trunk updated: MINOR: Set `replicaId` for
OffsetsForLeaderEpoch from followers (#6775)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bacb45e MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (#6775)
bacb45e is described below
commit bacb45e044aae61fd373f6535f8073263c972370
Author: David Arthur <mu...@gmail.com>
AuthorDate: Tue May 21 18:50:21 2019 -0400
MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (#6775)
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../internals/OffsetsForLeaderEpochClient.java | 2 +-
.../org/apache/kafka/common/requests/FetchRequest.java | 2 +-
.../apache/kafka/common/requests/FetchResponse.java | 14 ++++++++------
.../common/requests/OffsetsForLeaderEpochRequest.java | 13 +++++++++++--
.../kafka/clients/consumer/internals/FetcherTest.java | 14 +++++++-------
.../kafka/common/requests/RequestResponseTest.java | 18 +++++++++---------
.../main/scala/kafka/server/ReplicaFetcherThread.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../server/OffsetsForLeaderEpochRequestTest.scala | 4 ++--
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../server/epoch/LeaderEpochIntegrationTest.scala | 2 +-
11 files changed, 43 insertions(+), 32 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index 9ffedd1..d7b02a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -53,7 +53,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
fetchEpoch -> partitionData.put(topicPartition,
new OffsetsForLeaderEpochRequest.PartitionData(fetchPosition.currentLeader.epoch, fetchEpoch))));
- return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData);
+ return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 485b102..da09df3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -459,7 +459,7 @@ public class FetchRequest extends AbstractRequest {
for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) {
FetchResponse.PartitionData<MemoryRecords> partitionResponse = new FetchResponse.PartitionData<>(error,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY);
+ FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY);
responseData.put(entry.getKey(), partitionResponse);
}
return new FetchResponse<>(error, responseData, throttleTimeMs, metadata.sessionId());
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index e857b5b..942b0d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
+import java.util.function.Predicate;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
@@ -223,7 +224,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
public static final long INVALID_HIGHWATERMARK = -1L;
public static final long INVALID_LAST_STABLE_OFFSET = -1L;
public static final long INVALID_LOG_START_OFFSET = -1L;
- public static final int UNSPECIFIED_PREFERRED_REPLICA = -1;
+ public static final int INVALID_PREFERRED_REPLICA_ID = -1;
private final int throttleTimeMs;
private final Errors error;
@@ -277,14 +278,14 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
long highWatermark,
long lastStableOffset,
long logStartOffset,
- Integer preferredReadReplica,
+ Optional<Integer> preferredReadReplica,
List<AbortedTransaction> abortedTransactions,
T records) {
this.error = error;
this.highWatermark = highWatermark;
this.lastStableOffset = lastStableOffset;
this.logStartOffset = logStartOffset;
- this.preferredReadReplica = Optional.ofNullable(preferredReadReplica);
+ this.preferredReadReplica = preferredReadReplica;
this.abortedTransactions = abortedTransactions;
this.records = records;
}
@@ -379,7 +380,9 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
- int preferredReadReplica = partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, UNSPECIFIED_PREFERRED_REPLICA);
+ Optional<Integer> preferredReadReplica = Optional.of(
+ partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
+ ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
if (!(baseRecords instanceof MemoryRecords))
@@ -401,8 +404,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
}
PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
- logStartOffset, preferredReadReplica == UNSPECIFIED_PREFERRED_REPLICA ? null : preferredReadReplica,
- abortedTransactions, records);
+ logStartOffset, preferredReadReplica, abortedTransactions, records);
responseData.put(new TopicPartition(topic, partition), partitionData);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 5052b0e..6599a70 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -101,10 +101,19 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
private final Map<TopicPartition, PartitionData> epochsByPartition;
private final int replicaId;
- public Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition) {
+ Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
this.epochsByPartition = epochsByPartition;
- this.replicaId = CONSUMER_REPLICA_ID;
+ this.replicaId = replicaId;
+ }
+
+ public static Builder forConsumer(short version, Map<TopicPartition, PartitionData> epochsByPartition) {
+ return new Builder(version, epochsByPartition, CONSUMER_REPLICA_ID);
+ }
+
+ public static Builder forFollower(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
+ return new Builder(version, epochsByPartition, replicaId);
+
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 2f40ffc..0e2662a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1173,7 +1173,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = new HashMap<>();
partitions.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records));
client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));
@@ -1184,7 +1184,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
partitions = new HashMap<>();
partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY));
client.prepareResponse(new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID));
consumerClient.poll(time.timer(0));
assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
@@ -3341,7 +3341,7 @@ public class FetcherTest {
// Set preferred read replica to node=1
client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
@@ -3358,7 +3358,7 @@ public class FetcherTest {
// Set preferred read replica to node=2, which isn't in our metadata, should revert to leader
client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 2));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(2)));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
fetchedRecords();
@@ -3379,7 +3379,7 @@ public class FetcherTest {
assertFalse(fetcher.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.of(1)));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
@@ -3393,7 +3393,7 @@ public class FetcherTest {
assertFalse(fetcher.hasCompletedFetches());
client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, null));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, Optional.empty()));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
@@ -3449,7 +3449,7 @@ public class FetcherTest {
}
private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
- long lastStableOffset, int throttleTime, Integer preferredReplicaId) {
+ long lastStableOffset, int throttleTime, Optional<Integer> preferredReplicaId) {
Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp,
new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 0L,
preferredReplicaId, null, records));
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 1cef864..d7e9223 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -559,7 +559,7 @@ public class RequestResponseTest {
MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(
Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- 0L, null, null, records));
+ 0L, Optional.empty(), null, records));
FetchResponse<MemoryRecords> v0Response = new FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID);
FetchResponse<MemoryRecords> v1Response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
@@ -583,11 +583,11 @@ public class RequestResponseTest {
new FetchResponse.AbortedTransaction(15, 50)
);
responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData<>(Errors.NONE, 100000,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, abortedTransactions, records));
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), abortedTransactions, records));
responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData<>(Errors.NONE, 900000,
- 5, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
+ 5, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, records));
responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData<>(Errors.NONE, 70000,
- 6, FetchResponse.INVALID_LOG_START_OFFSET, null, Collections.emptyList(), records));
+ 6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), Collections.emptyList(), records));
FetchResponse<MemoryRecords> response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -751,11 +751,11 @@ public class RequestResponseTest {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records));
List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
new FetchResponse.AbortedTransaction(234L, 999L));
responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
}
@@ -763,12 +763,12 @@ public class RequestResponseTest {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), null, records));
List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
new FetchResponse.AbortedTransaction(234L, 999L));
responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
- 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY));
+ 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, Optional.empty(), abortedTransactions, MemoryRecords.EMPTY));
return new FetchResponse<>(Errors.NONE, responseData, 25, INVALID_SESSION_ID);
}
@@ -1260,7 +1260,7 @@ public class RequestResponseTest {
epochs.put(new TopicPartition("topic2", 2),
new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
- return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
+ return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
}
private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8e92c2b..ab5be6e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -303,7 +303,7 @@ class ReplicaFetcherThread(name: String,
return Map.empty
}
- val epochRequest = new OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, partitions.asJava)
+ val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(offsetForLeaderEpochRequestVersion, partitions.asJava, brokerConfig.brokerId)
debug(s"Sending offset for leader epoch request $epochRequest")
try {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index b465773..a577b2e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -295,7 +295,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = {
val epochs = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(27), 7))
- new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs.asJava).build()
+ OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs.asJava).build()
}
private def createOffsetFetchRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 2cdd2e8..4d1416c 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -35,7 +35,7 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
val partition = new TopicPartition(topic, 0)
val epochs = Map(partition -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty[Integer], 0)).asJava
- val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs).build()
+ val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs).build()
// Unknown topic
val randomBrokerId = servers.head.config.brokerId
@@ -61,7 +61,7 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
val epochs = Map(topicPartition -> new OffsetsForLeaderEpochRequest.PartitionData(
currentLeaderEpoch, 0)).asJava
- val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs)
+ val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs)
.build()
assertResponseError(error, brokerId, request)
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ddcee12..047188f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -362,7 +362,7 @@ class RequestQuotaTest extends BaseRequestTest {
new InitProducerIdRequest.Builder(requestData)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
- new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
+ OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(15), 0)).asJava)
case ApiKeys.ADD_PARTITIONS_TO_TXN =>
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index cb8996c..8a6dcba 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -279,7 +279,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
val partitionData = partitions.mapValues(
new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), _))
- val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
+ val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
partitionData.asJava)
val response = sender.sendRequest(request)
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala