You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/28 09:57:24 UTC

kafka git commit: KAFKA-5960; MINOR: Follow-up cleanup of

Repository: kafka
Updated Branches:
  refs/heads/trunk e1543a5a8 -> 675e54f9e


KAFKA-5960; MINOR: Follow-up cleanup of

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3976 from hachikuji/version-fix-followup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/675e54f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/675e54f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/675e54f9

Branch: refs/heads/trunk
Commit: 675e54f9efefc7150b88ebf508bcc67303fe0c57
Parents: e1543a5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Sep 28 10:57:18 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Sep 28 10:57:18 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ApiVersions.java   |  2 +-
 .../org/apache/kafka/clients/NetworkClient.java |  2 +-
 .../apache/kafka/clients/NodeApiVersions.java   | 88 +++++++-------------
 .../kafka/common/requests/AbstractRequest.java  | 13 ++-
 .../kafka/common/requests/FetchRequest.java     |  4 +-
 .../common/requests/ListOffsetRequest.java      |  8 +-
 .../org/apache/kafka/clients/MockClient.java    |  2 +-
 .../kafka/clients/NodeApiVersionsTest.java      | 36 ++++----
 .../admin/BrokerApiVersionsCommandTest.scala    |  2 +-
 .../kafka/api/LegacyAdminClientTest.scala       |  2 +-
 10 files changed, 69 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
index 9c61ff2..8001f1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
@@ -53,7 +53,7 @@ public class ApiVersions {
         // we will need to convert the messages when they are ready to be sent.
         byte maxUsableMagic = RecordBatch.CURRENT_MAGIC_VALUE;
         for (NodeApiVersions versions : this.nodeApiVersions.values()) {
-            byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.usableVersion(ApiKeys.PRODUCE));
+            byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE));
             maxUsableMagic = (byte) Math.min(nodeMaxUsableMagic, maxUsableMagic);
         }
         return maxUsableMagic;

http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0fbaff7..ee7258a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -389,7 +389,7 @@ public class NetworkClient implements KafkaClient {
                     log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                             "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
             } else {
-                version = versionInfo.usableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
+                version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
                         builder.latestAllowedVersion());
             }
             // The call to build may also throw UnsupportedVersionException, if there are essential

http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 93a5c72..c8b3f44 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -35,7 +35,7 @@ import java.util.TreeMap;
  */
 public class NodeApiVersions {
     // A map of the usable versions of each API, keyed by the ApiKeys instance
-    private final Map<ApiKeys, UsableVersion> usableVersions = new EnumMap<>(ApiKeys.class);
+    private final Map<ApiKeys, ApiVersion> supportedVersions = new EnumMap<>(ApiKeys.class);
 
     // List of APIs which the broker supports, but which are unknown to the client
     private final List<ApiVersion> unknownApis = new ArrayList<>();
@@ -77,7 +77,7 @@ public class NodeApiVersions {
         for (ApiVersion nodeApiVersion : nodeApiVersions) {
             if (ApiKeys.hasId(nodeApiVersion.apiKey)) {
                 ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey);
-                usableVersions.put(nodeApiKey, new UsableVersion(nodeApiKey, nodeApiVersion));
+                supportedVersions.put(nodeApiKey, nodeApiVersion);
             } else {
                 // Newer brokers may support ApiKeys we don't know about
                 unknownApis.add(nodeApiVersion);
@@ -88,18 +88,29 @@ public class NodeApiVersions {
     /**
      * Return the most recent version supported by both the node and the local software.
      */
-    public short usableVersion(ApiKeys apiKey) {
-        return usableVersion(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
+    public short latestUsableVersion(ApiKeys apiKey) {
+        return latestUsableVersion(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
     }
 
     /**
      * Get the latest version supported by the broker within an allowed range of versions
      */
-    public short usableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
-        UsableVersion usableVersion = usableVersions.get(apiKey);
+    public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
+        ApiVersion usableVersion = supportedVersions.get(apiKey);
         if (usableVersion == null)
             throw new UnsupportedVersionException("The broker does not support " + apiKey);
-        return usableVersion.latestSupportedVersion(oldestAllowedVersion, latestAllowedVersion);
+        return latestUsableVersion(apiKey, usableVersion, oldestAllowedVersion, latestAllowedVersion);
+    }
+
+    private short latestUsableVersion(ApiKeys apiKey, ApiVersion supportedVersions,
+                                      short minAllowedVersion, short maxAllowedVersion) {
+        short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion);
+        short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion);
+        if (minVersion > maxVersion)
+            throw new UnsupportedVersionException("The broker does not support " + apiKey +
+                    " with version in range [" + minAllowedVersion + "," + maxAllowedVersion + "]. The supported" +
+                    " range is [" + supportedVersions.minVersion + "," + supportedVersions.maxVersion + "].");
+        return maxVersion;
     }
 
     /**
@@ -122,8 +133,8 @@ public class NodeApiVersions {
         // a TreeMap before printing it out to ensure that we always print in
         // ascending order.
         TreeMap<Short, String> apiKeysText = new TreeMap<>();
-        for (UsableVersion usableVersion : this.usableVersions.values())
-            apiKeysText.put(usableVersion.apiVersion.apiKey, apiVersionToText(usableVersion.apiVersion));
+        for (ApiVersion supportedVersion : this.supportedVersions.values())
+            apiKeysText.put(supportedVersion.apiKey, apiVersionToText(supportedVersion));
         for (ApiVersion apiVersion : unknownApis)
             apiKeysText.put(apiVersion.apiKey, apiVersionToText(apiVersion));
 
@@ -166,13 +177,15 @@ public class NodeApiVersions {
         }
 
         if (apiKey != null) {
-            UsableVersion usableVersion = usableVersions.get(apiKey);
-            if (usableVersion.isTooOld())
-                bld.append(" [unusable: node too old]");
-            else if (usableVersion.isTooNew())
+            ApiVersion supportedVersion = supportedVersions.get(apiKey);
+            if (apiKey.latestVersion() < supportedVersion.minVersion) {
                 bld.append(" [unusable: node too new]");
-            else
-                bld.append(" [usable: ").append(usableVersion.value).append("]");
+            } else if (supportedVersion.maxVersion < apiKey.oldestVersion()) {
+                bld.append(" [unusable: node too old]");
+            } else {
+                short latestUsableVersion = Utils.min(apiKey.latestVersion(), supportedVersion.maxVersion);
+                bld.append(" [usable: ").append(latestUsableVersion).append("]");
+            }
         }
         return bld.toString();
     }
@@ -184,50 +197,7 @@ public class NodeApiVersions {
      * @return The api version information from the broker or null if it is unsupported
      */
     public ApiVersion apiVersion(ApiKeys apiKey) {
-        UsableVersion usableVersion = usableVersions.get(apiKey);
-        if (usableVersion == null)
-            return null;
-        return usableVersion.apiVersion;
-    }
-
-    private static class UsableVersion {
-        private static final short NODE_TOO_OLD = (short) -1;
-        private static final short NODE_TOO_NEW = (short) -2;
-
-        private final ApiKeys apiKey;
-        private final ApiVersion apiVersion;
-        private final Short value;
-
-        private UsableVersion(ApiKeys apiKey, ApiVersion nodeApiVersion) {
-            this.apiKey = apiKey;
-            this.apiVersion = nodeApiVersion;
-            short v = Utils.min(apiKey.latestVersion(), nodeApiVersion.maxVersion);
-            if (v < nodeApiVersion.minVersion) {
-                this.value = NODE_TOO_NEW;
-            } else if (v < apiKey.oldestVersion()) {
-                this.value = NODE_TOO_OLD;
-            } else {
-                this.value = v;
-            }
-        }
-
-        private boolean isTooOld() {
-            return value == NODE_TOO_OLD;
-        }
-
-        private boolean isTooNew() {
-            return value == NODE_TOO_NEW;
-        }
-
-        private short latestSupportedVersion(short minAllowedVersion, short maxAllowedVersion) {
-            short minVersion = (short) Math.max(minAllowedVersion, apiVersion.minVersion);
-            short maxVersion = (short) Math.min(maxAllowedVersion, apiVersion.maxVersion);
-            if (minVersion > maxVersion)
-                throw new UnsupportedVersionException("The broker does not support " + apiKey +
-                        " with version in range [" + minAllowedVersion + "," + maxAllowedVersion + "]. The supported" +
-                        " range is [" + apiVersion.minVersion + "," + apiVersion.maxVersion + "].");
-            return maxVersion;
-        }
+        return supportedVersions.get(apiKey);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index bbf13d6..da1d147 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -30,14 +30,23 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
         private final short oldestAllowedVersion;
         private final short latestAllowedVersion;
 
+        /**
+         * Construct a new builder which allows any supported version
+         */
         public Builder(ApiKeys apiKey) {
             this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
         }
 
-        public Builder(ApiKeys apiKey, short desiredVersion) {
-            this(apiKey, desiredVersion, desiredVersion);
+        /**
+         * Construct a new builder which allows only a specific version
+         */
+        public Builder(ApiKeys apiKey, short allowedVersion) {
+            this(apiKey, allowedVersion, allowedVersion);
         }
 
+        /**
+         * Construct a new builder which allows an inclusive range of versions
+         */
         public Builder(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
             this.apiKey = apiKey;
             this.oldestAllowedVersion = oldestAllowedVersion;

http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 1a9e553..0315734 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -213,9 +213,9 @@ public class FetchRequest extends AbstractRequest {
                     maxWait, minBytes, fetchData, isolationLevel);
         }
 
-        public static Builder forReplica(short desiredVersion, int replicaId, int maxWait, int minBytes,
+        public static Builder forReplica(short allowedVersion, int replicaId, int maxWait, int minBytes,
                                          LinkedHashMap<TopicPartition, PartitionData> fetchData) {
-            return new Builder(desiredVersion, desiredVersion, replicaId, maxWait, minBytes, fetchData,
+            return new Builder(allowedVersion, allowedVersion, replicaId, maxWait, minBytes, fetchData,
                     IsolationLevel.READ_UNCOMMITTED);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index e252d3d..98f53bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -107,8 +107,8 @@ public class ListOffsetRequest extends AbstractRequest {
         private Map<TopicPartition, PartitionData> offsetData = null;
         private Map<TopicPartition, Long> partitionTimestamps = null;
 
-        public static Builder forReplica(short desiredVersion, int replicaId) {
-            return new Builder((short) 0, desiredVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
+        public static Builder forReplica(short allowedVersion, int replicaId) {
+            return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
         }
 
         public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
@@ -120,8 +120,8 @@ public class ListOffsetRequest extends AbstractRequest {
             return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
         }
 
-        private Builder(short minVersion, short maxVersion, int replicaId, IsolationLevel isolationLevel) {
-            super(ApiKeys.LIST_OFFSETS, minVersion, maxVersion);
+        private Builder(short oldestAllowedVersion, short latestAllowedVersion, int replicaId, IsolationLevel isolationLevel) {
+            super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, latestAllowedVersion);
             this.replicaId = replicaId;
             this.isolationLevel = isolationLevel;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 4037112..8b33472 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -165,7 +165,7 @@ public class MockClient implements KafkaClient {
                 continue;
 
             AbstractRequest.Builder<?> builder = request.requestBuilder();
-            short version = nodeApiVersions.usableVersion(request.apiKey(), builder.oldestAllowedVersion(),
+            short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
                     builder.latestAllowedVersion());
             AbstractRequest abstractRequest = request.requestBuilder().build(version);
             if (!futureResp.requestMatcher.matches(abstractRequest))

http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 266d2c9..88d0c2e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -91,46 +91,46 @@ public class NodeApiVersionsTest {
     }
 
     @Test
-    public void testDesiredVersion() {
+    public void testLatestUsableVersion() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
                 new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
-        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE));
-        assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1));
-        assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 1));
-        assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 2));
-        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 3));
-        assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2, (short) 2));
-        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2, (short) 3));
-        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 3));
-        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4));
+        assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE));
+        assertEquals(1, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1));
+        assertEquals(1, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 1, (short) 1));
+        assertEquals(2, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 1, (short) 2));
+        assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 1, (short) 3));
+        assertEquals(2, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 2, (short) 2));
+        assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 2, (short) 3));
+        assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 3));
+        assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4));
     }
 
     @Test(expected = UnsupportedVersionException.class)
-    public void testDesiredVersionTooLarge() {
+    public void testLatestUsableVersionOutOfRangeLow() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
                 new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 2)));
-        apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4);
+        apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4);
     }
 
     @Test(expected = UnsupportedVersionException.class)
-    public void testDesiredVersionTooSmall() {
+    public void testLatestUsableVersionOutOfRangeHigh() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
                 new ApiVersion(ApiKeys.PRODUCE.id, (short) 2, (short) 3)));
-        apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1);
+        apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1);
     }
 
     @Test(expected = UnsupportedVersionException.class)
     public void testUsableVersionCalculationNoKnownVersions() {
         List<ApiVersion> versionList = new ArrayList<>();
         NodeApiVersions versions =  new NodeApiVersions(versionList);
-        versions.usableVersion(ApiKeys.FETCH);
+        versions.latestUsableVersion(ApiKeys.FETCH);
     }
 
     @Test(expected = UnsupportedVersionException.class)
-    public void testUsableVersionOutOfRange() {
+    public void testLatestUsableVersionOutOfRange() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
                 new ApiVersion(ApiKeys.PRODUCE.id, (short) 300, (short) 300)));
-        apiVersions.usableVersion(ApiKeys.PRODUCE);
+        apiVersions.latestUsableVersion(ApiKeys.PRODUCE);
     }
 
     @Test
@@ -143,7 +143,7 @@ public class NodeApiVersionsTest {
         versionList.add(new ApiVersion((short) 100, (short) 0, (short) 1));
         NodeApiVersions versions =  new NodeApiVersions(versionList);
         for (ApiKeys apiKey: ApiKeys.values()) {
-            assertEquals(apiKey.latestVersion(), versions.usableVersion(apiKey));
+            assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 00a7c9f..1f4b5e2 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -49,7 +49,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
         if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
         else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
       val terminator = if (apiKey == ApiKeys.values.last) "" else ","
-      val usableVersion = nodeApiVersions.usableVersion(apiKey)
+      val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
       val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
       assertTrue(lineIter.hasNext)
       assertEquals(line, lineIter.next)

http://git-wip-us.apache.org/repos/asf/kafka/blob/675e54f9/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index c9c40a9..2839137 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -216,7 +216,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
       val hostStr = s"${node.host}:${node.port}"
       assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
       val brokerVersionInfo = tryBrokerVersionInfo.get
-      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+      assertEquals(1, brokerVersionInfo.latestUsableVersion(ApiKeys.API_VERSIONS))
     }
   }