You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/28 09:57:24 UTC
kafka git commit: KAFKA-5960; MINOR: Follow-up cleanup of
Repository: kafka
Updated Branches:
refs/heads/trunk e1543a5a8 -> 675e54f9e
KAFKA-5960; MINOR: Follow-up cleanup of
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3976 from hachikuji/version-fix-followup
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/675e54f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/675e54f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/675e54f9
Branch: refs/heads/trunk
Commit: 675e54f9efefc7150b88ebf508bcc67303fe0c57
Parents: e1543a5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Sep 28 10:57:18 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Sep 28 10:57:18 2017 +0100
----------------------------------------------------------------------
.../org/apache/kafka/clients/ApiVersions.java | 2 +-
.../org/apache/kafka/clients/NetworkClient.java | 2 +-
.../apache/kafka/clients/NodeApiVersions.java | 88 +++++++-------------
.../kafka/common/requests/AbstractRequest.java | 13 ++-
.../kafka/common/requests/FetchRequest.java | 4 +-
.../common/requests/ListOffsetRequest.java | 8 +-
.../org/apache/kafka/clients/MockClient.java | 2 +-
.../kafka/clients/NodeApiVersionsTest.java | 36 ++++----
.../admin/BrokerApiVersionsCommandTest.scala | 2 +-
.../kafka/api/LegacyAdminClientTest.scala | 2 +-
10 files changed, 69 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
index 9c61ff2..8001f1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
@@ -53,7 +53,7 @@ public class ApiVersions {
// we will need to convert the messages when they are ready to be sent.
byte maxUsableMagic = RecordBatch.CURRENT_MAGIC_VALUE;
for (NodeApiVersions versions : this.nodeApiVersions.values()) {
- byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.usableVersion(ApiKeys.PRODUCE));
+ byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE));
maxUsableMagic = (byte) Math.min(nodeMaxUsableMagic, maxUsableMagic);
}
return maxUsableMagic;
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0fbaff7..ee7258a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -389,7 +389,7 @@ public class NetworkClient implements KafkaClient {
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
- version = versionInfo.usableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
+ version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 93a5c72..c8b3f44 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -35,7 +35,7 @@ import java.util.TreeMap;
*/
public class NodeApiVersions {
// A map of the usable versions of each API, keyed by the ApiKeys instance
- private final Map<ApiKeys, UsableVersion> usableVersions = new EnumMap<>(ApiKeys.class);
+ private final Map<ApiKeys, ApiVersion> supportedVersions = new EnumMap<>(ApiKeys.class);
// List of APIs which the broker supports, but which are unknown to the client
private final List<ApiVersion> unknownApis = new ArrayList<>();
@@ -77,7 +77,7 @@ public class NodeApiVersions {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey)) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey);
- usableVersions.put(nodeApiKey, new UsableVersion(nodeApiKey, nodeApiVersion));
+ supportedVersions.put(nodeApiKey, nodeApiVersion);
} else {
// Newer brokers may support ApiKeys we don't know about
unknownApis.add(nodeApiVersion);
@@ -88,18 +88,29 @@ public class NodeApiVersions {
/**
* Return the most recent version supported by both the node and the local software.
*/
- public short usableVersion(ApiKeys apiKey) {
- return usableVersion(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
+ public short latestUsableVersion(ApiKeys apiKey) {
+ return latestUsableVersion(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
}
/**
* Get the latest version supported by the broker within an allowed range of versions
*/
- public short usableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
- UsableVersion usableVersion = usableVersions.get(apiKey);
+ public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
+ ApiVersion usableVersion = supportedVersions.get(apiKey);
if (usableVersion == null)
throw new UnsupportedVersionException("The broker does not support " + apiKey);
- return usableVersion.latestSupportedVersion(oldestAllowedVersion, latestAllowedVersion);
+ return latestUsableVersion(apiKey, usableVersion, oldestAllowedVersion, latestAllowedVersion);
+ }
+
+ private short latestUsableVersion(ApiKeys apiKey, ApiVersion supportedVersions,
+ short minAllowedVersion, short maxAllowedVersion) {
+ short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion);
+ short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion);
+ if (minVersion > maxVersion)
+ throw new UnsupportedVersionException("The broker does not support " + apiKey +
+ " with version in range [" + minAllowedVersion + "," + maxAllowedVersion + "]. The supported" +
+ " range is [" + supportedVersions.minVersion + "," + supportedVersions.maxVersion + "].");
+ return maxVersion;
}
/**
@@ -122,8 +133,8 @@ public class NodeApiVersions {
// a TreeMap before printing it out to ensure that we always print in
// ascending order.
TreeMap<Short, String> apiKeysText = new TreeMap<>();
- for (UsableVersion usableVersion : this.usableVersions.values())
- apiKeysText.put(usableVersion.apiVersion.apiKey, apiVersionToText(usableVersion.apiVersion));
+ for (ApiVersion supportedVersion : this.supportedVersions.values())
+ apiKeysText.put(supportedVersion.apiKey, apiVersionToText(supportedVersion));
for (ApiVersion apiVersion : unknownApis)
apiKeysText.put(apiVersion.apiKey, apiVersionToText(apiVersion));
@@ -166,13 +177,15 @@ public class NodeApiVersions {
}
if (apiKey != null) {
- UsableVersion usableVersion = usableVersions.get(apiKey);
- if (usableVersion.isTooOld())
- bld.append(" [unusable: node too old]");
- else if (usableVersion.isTooNew())
+ ApiVersion supportedVersion = supportedVersions.get(apiKey);
+ if (apiKey.latestVersion() < supportedVersion.minVersion) {
bld.append(" [unusable: node too new]");
- else
- bld.append(" [usable: ").append(usableVersion.value).append("]");
+ } else if (supportedVersion.maxVersion < apiKey.oldestVersion()) {
+ bld.append(" [unusable: node too old]");
+ } else {
+ short latestUsableVersion = Utils.min(apiKey.latestVersion(), supportedVersion.maxVersion);
+ bld.append(" [usable: ").append(latestUsableVersion).append("]");
+ }
}
return bld.toString();
}
@@ -184,50 +197,7 @@ public class NodeApiVersions {
* @return The api version information from the broker or null if it is unsupported
*/
public ApiVersion apiVersion(ApiKeys apiKey) {
- UsableVersion usableVersion = usableVersions.get(apiKey);
- if (usableVersion == null)
- return null;
- return usableVersion.apiVersion;
- }
-
- private static class UsableVersion {
- private static final short NODE_TOO_OLD = (short) -1;
- private static final short NODE_TOO_NEW = (short) -2;
-
- private final ApiKeys apiKey;
- private final ApiVersion apiVersion;
- private final Short value;
-
- private UsableVersion(ApiKeys apiKey, ApiVersion nodeApiVersion) {
- this.apiKey = apiKey;
- this.apiVersion = nodeApiVersion;
- short v = Utils.min(apiKey.latestVersion(), nodeApiVersion.maxVersion);
- if (v < nodeApiVersion.minVersion) {
- this.value = NODE_TOO_NEW;
- } else if (v < apiKey.oldestVersion()) {
- this.value = NODE_TOO_OLD;
- } else {
- this.value = v;
- }
- }
-
- private boolean isTooOld() {
- return value == NODE_TOO_OLD;
- }
-
- private boolean isTooNew() {
- return value == NODE_TOO_NEW;
- }
-
- private short latestSupportedVersion(short minAllowedVersion, short maxAllowedVersion) {
- short minVersion = (short) Math.max(minAllowedVersion, apiVersion.minVersion);
- short maxVersion = (short) Math.min(maxAllowedVersion, apiVersion.maxVersion);
- if (minVersion > maxVersion)
- throw new UnsupportedVersionException("The broker does not support " + apiKey +
- " with version in range [" + minAllowedVersion + "," + maxAllowedVersion + "]. The supported" +
- " range is [" + apiVersion.minVersion + "," + apiVersion.maxVersion + "].");
- return maxVersion;
- }
+ return supportedVersions.get(apiKey);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index bbf13d6..da1d147 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -30,14 +30,23 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
private final short oldestAllowedVersion;
private final short latestAllowedVersion;
+ /**
+ * Construct a new builder which allows any supported version
+ */
public Builder(ApiKeys apiKey) {
this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
}
- public Builder(ApiKeys apiKey, short desiredVersion) {
- this(apiKey, desiredVersion, desiredVersion);
+ /**
+ * Construct a new builder which allows only a specific version
+ */
+ public Builder(ApiKeys apiKey, short allowedVersion) {
+ this(apiKey, allowedVersion, allowedVersion);
}
+ /**
+ * Construct a new builder which allows an inclusive range of versions
+ */
public Builder(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
this.apiKey = apiKey;
this.oldestAllowedVersion = oldestAllowedVersion;
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 1a9e553..0315734 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -213,9 +213,9 @@ public class FetchRequest extends AbstractRequest {
maxWait, minBytes, fetchData, isolationLevel);
}
- public static Builder forReplica(short desiredVersion, int replicaId, int maxWait, int minBytes,
+ public static Builder forReplica(short allowedVersion, int replicaId, int maxWait, int minBytes,
LinkedHashMap<TopicPartition, PartitionData> fetchData) {
- return new Builder(desiredVersion, desiredVersion, replicaId, maxWait, minBytes, fetchData,
+ return new Builder(allowedVersion, allowedVersion, replicaId, maxWait, minBytes, fetchData,
IsolationLevel.READ_UNCOMMITTED);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index e252d3d..98f53bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -107,8 +107,8 @@ public class ListOffsetRequest extends AbstractRequest {
private Map<TopicPartition, PartitionData> offsetData = null;
private Map<TopicPartition, Long> partitionTimestamps = null;
- public static Builder forReplica(short desiredVersion, int replicaId) {
- return new Builder((short) 0, desiredVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
+ public static Builder forReplica(short allowedVersion, int replicaId) {
+ return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
}
public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
@@ -120,8 +120,8 @@ public class ListOffsetRequest extends AbstractRequest {
return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
}
- private Builder(short minVersion, short maxVersion, int replicaId, IsolationLevel isolationLevel) {
- super(ApiKeys.LIST_OFFSETS, minVersion, maxVersion);
+ private Builder(short oldestAllowedVersion, short latestAllowedVersion, int replicaId, IsolationLevel isolationLevel) {
+ super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, latestAllowedVersion);
this.replicaId = replicaId;
this.isolationLevel = isolationLevel;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 4037112..8b33472 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -165,7 +165,7 @@ public class MockClient implements KafkaClient {
continue;
AbstractRequest.Builder<?> builder = request.requestBuilder();
- short version = nodeApiVersions.usableVersion(request.apiKey(), builder.oldestAllowedVersion(),
+ short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 266d2c9..88d0c2e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -91,46 +91,46 @@ public class NodeApiVersionsTest {
}
@Test
- public void testDesiredVersion() {
+ public void testLatestUsableVersion() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
- assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE));
- assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1));
- assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 1));
- assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 2));
- assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 3));
- assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2, (short) 2));
- assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2, (short) 3));
- assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 3));
- assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4));
+ assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE));
+ assertEquals(1, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1));
+ assertEquals(1, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 1, (short) 1));
+ assertEquals(2, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 1, (short) 2));
+ assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 1, (short) 3));
+ assertEquals(2, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 2, (short) 2));
+ assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 2, (short) 3));
+ assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 3));
+ assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4));
}
@Test(expected = UnsupportedVersionException.class)
- public void testDesiredVersionTooLarge() {
+ public void testLatestUsableVersionOutOfRangeLow() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 2)));
- apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4);
+ apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4);
}
@Test(expected = UnsupportedVersionException.class)
- public void testDesiredVersionTooSmall() {
+ public void testLatestUsableVersionOutOfRangeHigh() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
new ApiVersion(ApiKeys.PRODUCE.id, (short) 2, (short) 3)));
- apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1);
+ apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1);
}
@Test(expected = UnsupportedVersionException.class)
public void testUsableVersionCalculationNoKnownVersions() {
List<ApiVersion> versionList = new ArrayList<>();
NodeApiVersions versions = new NodeApiVersions(versionList);
- versions.usableVersion(ApiKeys.FETCH);
+ versions.latestUsableVersion(ApiKeys.FETCH);
}
@Test(expected = UnsupportedVersionException.class)
- public void testUsableVersionOutOfRange() {
+ public void testLatestUsableVersionOutOfRange() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
new ApiVersion(ApiKeys.PRODUCE.id, (short) 300, (short) 300)));
- apiVersions.usableVersion(ApiKeys.PRODUCE);
+ apiVersions.latestUsableVersion(ApiKeys.PRODUCE);
}
@Test
@@ -143,7 +143,7 @@ public class NodeApiVersionsTest {
versionList.add(new ApiVersion((short) 100, (short) 0, (short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList);
for (ApiKeys apiKey: ApiKeys.values()) {
- assertEquals(apiKey.latestVersion(), versions.usableVersion(apiKey));
+ assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 00a7c9f..1f4b5e2 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -49,7 +49,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
val terminator = if (apiKey == ApiKeys.values.last) "" else ","
- val usableVersion = nodeApiVersions.usableVersion(apiKey)
+ val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
assertTrue(lineIter.hasNext)
assertEquals(line, lineIter.next)
http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index c9c40a9..2839137 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -216,7 +216,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
val hostStr = s"${node.host}:${node.port}"
assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
val brokerVersionInfo = tryBrokerVersionInfo.get
- assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+ assertEquals(1, brokerVersionInfo.latestUsableVersion(ApiKeys.API_VERSIONS))
}
}