You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:44 UTC

[30/50] [abbrv] kafka git commit: KAFKA-3307; Add ApiVersions Request/Response and server side handling.

KAFKA-3307; Add ApiVersions Request/Response and server side handling.

The patch does the following.
1. Adds ApiVersionsRequest/Response.
2. Adds UNSUPPORTED_VERSION error and UnsupportedVersionException.
3. Adds broker side handling of ApiVersionsRequest.

Author: Ashish Singh <as...@cloudera.com>

Reviewers: Gwen Shapira, Ismael Juma, Magnus Edenhill

Closes #986 from SinghAsDev/KAFKA-3307


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8407dac6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8407dac6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8407dac6

Branch: refs/heads/0.10.0
Commit: 8407dac6ee409d832c95533e6f1d5578511232ae
Parents: 4c76b5f
Author: Ashish Singh <as...@cloudera.com>
Authored: Wed Apr 27 11:28:32 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Apr 27 11:28:32 2016 -0700

----------------------------------------------------------------------
 .../errors/UnsupportedVersionException.java     |  25 ++++
 .../apache/kafka/common/protocol/ApiKeys.java   |   5 +-
 .../apache/kafka/common/protocol/Errors.java    |   5 +-
 .../apache/kafka/common/protocol/Protocol.java  |  41 ++++++-
 .../kafka/common/requests/AbstractRequest.java  |   4 +-
 .../common/requests/ApiVersionsRequest.java     |  55 +++++++++
 .../common/requests/ApiVersionsResponse.java    | 116 +++++++++++++++++++
 .../common/requests/RequestResponseTest.java    |  16 ++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  77 +++++++-----
 .../unit/kafka/server/ApiVersionsTest.scala     |  51 ++++++++
 10 files changed, 357 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
new file mode 100644
index 0000000..3679be4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class UnsupportedVersionException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public UnsupportedVersionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public UnsupportedVersionException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 512a121..aeb0b45 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -37,7 +37,8 @@ public enum ApiKeys {
     SYNC_GROUP(14, "SyncGroup"),
     DESCRIBE_GROUPS(15, "DescribeGroups"),
     LIST_GROUPS(16, "ListGroups"),
-    SASL_HANDSHAKE(17, "SaslHandshake");
+    SASL_HANDSHAKE(17, "SaslHandshake"),
+    API_VERSIONS(18, "ApiVersions");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
@@ -97,4 +98,4 @@ public enum ApiKeys {
         System.out.println(toHtml());
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 9013399..64a709e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -136,7 +137,9 @@ public enum Errors {
     UNSUPPORTED_SASL_MECHANISM(33,
             new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")),
     ILLEGAL_SASL_STATE(34,
-            new IllegalSaslStateException("Request is not valid given the current SASL state."));
+            new IllegalSaslStateException("Request is not valid given the current SASL state.")),
+    UNSUPPORTED_VERSION(35,
+            new UnsupportedVersionException("The version of API is not supported."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index d322095..99cdbf9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -759,10 +759,24 @@ public class Protocol {
     public static final Schema[] SASL_HANDSHAKE_REQUEST = new Schema[] {SASL_HANDSHAKE_REQUEST_V0};
     public static final Schema[] SASL_HANDSHAKE_RESPONSE = new Schema[] {SASL_HANDSHAKE_RESPONSE_V0};
 
+    /* ApiVersion api */
+    public static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
+
+    public static final Schema API_VERSIONS_V0 = new Schema(new Field("api_key", INT16, "API key."),
+                                                           new Field("min_version", INT16, "Minimum supported version."),
+                                                           new Field("max_version", INT16, "Maximum supported version."));
+
+    public static final Schema API_VERSIONS_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."),
+                                                                    new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."));
+
+    public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0};
+    public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
+    public static final short[] MIN_VERSIONS = new short[ApiKeys.MAX_API_KEY + 1];
 
     /* the latest version of each api */
     public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
@@ -786,6 +800,7 @@ public class Protocol {
         REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST;
         REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST;
         REQUESTS[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_REQUEST;
+        REQUESTS[ApiKeys.API_VERSIONS.id] = API_VERSIONS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -805,16 +820,32 @@ public class Protocol {
         RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE;
         RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE;
         RESPONSES[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_RESPONSE;
+        RESPONSES[ApiKeys.API_VERSIONS.id] = API_VERSIONS_RESPONSE;
 
-        /* set the maximum version of each api */
-        for (ApiKeys api : ApiKeys.values())
+        /* set the minimum and maximum version of each api */
+        for (ApiKeys api : ApiKeys.values()) {
             CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
+            for (int i = 0; i < REQUESTS[api.id].length; ++i)
+                if (REQUESTS[api.id][i] != null) {
+                    MIN_VERSIONS[api.id] = (short) i;
+                    break;
+                }
+        }
 
-        /* sanity check that we have the same number of request and response versions for each api */
-        for (ApiKeys api : ApiKeys.values())
+        /* sanity check that:
+         *   - we have the same number of request and response versions for each api
+         *   - we have a consistent set of request and response versions for each api */
+        for (ApiKeys api : ApiKeys.values()) {
             if (REQUESTS[api.id].length != RESPONSES[api.id].length)
                 throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name
                         + " but " + RESPONSES[api.id].length + " response versions.");
+
+            for (int i = 0; i < REQUESTS[api.id].length; ++i)
+                if ((REQUESTS[api.id][i] == null && RESPONSES[api.id][i] != null) ||
+                        (REQUESTS[api.id][i] != null && RESPONSES[api.id][i] == null))
+                    throw new IllegalStateException("Request and response for version " + i + " of API "
+                            + api.id + " are defined inconsistently. One is null while the other is not null.");
+        }
     }
 
     private static String indentString(int size) {
@@ -977,4 +1008,4 @@ public class Protocol {
         System.out.println(toHtml());
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 89c2ce1..ab61c66 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -74,9 +74,11 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return ListGroupsRequest.parse(buffer, versionId);
             case SASL_HANDSHAKE:
                 return SaslHandshakeRequest.parse(buffer, versionId);
+            case API_VERSIONS:
+                return ApiVersionsRequest.parse(buffer, versionId);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
new file mode 100644
index 0000000..b78c759
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class ApiVersionsRequest extends AbstractRequest {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.API_VERSIONS.id);
+
+    public ApiVersionsRequest() {
+        super(new Struct(CURRENT_SCHEMA));
+    }
+
+    public ApiVersionsRequest(Struct struct) {
+        super(struct);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                short errorCode = Errors.forException(e).code();
+                return new ApiVersionsResponse(errorCode, Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.API_VERSIONS.id)));
+        }
+    }
+
+    public static ApiVersionsRequest parse(ByteBuffer buffer, int versionId) {
+        return new ApiVersionsRequest(ProtoUtils.parseRequest(ApiKeys.API_VERSIONS.id, versionId, buffer));
+    }
+
+    public static ApiVersionsRequest parse(ByteBuffer buffer) {
+        return new ApiVersionsRequest(CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
new file mode 100644
index 0000000..36881a3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ApiVersionsResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id);
+
+    public static final String ERROR_CODE_KEY_NAME = "error_code";
+    public static final String API_VERSIONS_KEY_NAME = "api_versions";
+    public static final String API_KEY_NAME = "api_key";
+    public static final String MIN_VERSION_KEY_NAME = "min_version";
+    public static final String MAX_VERSION_KEY_NAME = "max_version";
+
+    /**
+     * Possible error codes:
+     *
+     * UNSUPPORTED_VERSION (33)
+     */
+    private final short errorCode;
+    private final Map<Short, ApiVersion> apiKeyToApiVersion;
+
+    public static final class ApiVersion {
+        public final short apiKey;
+        public final short minVersion;
+        public final short maxVersion;
+
+        public ApiVersion(short apiKey, short minVersion, short maxVersion) {
+            this.apiKey = apiKey;
+            this.minVersion = minVersion;
+            this.maxVersion = maxVersion;
+        }
+    }
+
+    public ApiVersionsResponse(short errorCode, List<ApiVersion> apiVersions) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        List<Struct> apiVersionList = new ArrayList<>();
+        for (ApiVersion apiVersion : apiVersions) {
+            Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME);
+            apiVersionStruct.set(API_KEY_NAME, apiVersion.apiKey);
+            apiVersionStruct.set(MIN_VERSION_KEY_NAME, apiVersion.minVersion);
+            apiVersionStruct.set(MAX_VERSION_KEY_NAME, apiVersion.maxVersion);
+            apiVersionList.add(apiVersionStruct);
+        }
+        struct.set(API_VERSIONS_KEY_NAME, apiVersionList.toArray());
+        this.errorCode = errorCode;
+        this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions);
+    }
+
+    public ApiVersionsResponse(Struct struct) {
+        super(struct);
+        this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        List<ApiVersion> tempApiVersions = new ArrayList<>();
+        for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) {
+            Struct apiVersionStruct = (Struct) apiVersionsObj;
+            short apiKey = apiVersionStruct.getShort(API_KEY_NAME);
+            short minVersion = apiVersionStruct.getShort(MIN_VERSION_KEY_NAME);
+            short maxVersion = apiVersionStruct.getShort(MAX_VERSION_KEY_NAME);
+            tempApiVersions.add(new ApiVersion(apiKey, minVersion, maxVersion));
+        }
+        this.apiKeyToApiVersion = buildApiKeyToApiVersion(tempApiVersions);
+    }
+
+    public Collection<ApiVersion> apiVersions() {
+        return apiKeyToApiVersion.values();
+    }
+
+    public ApiVersion apiVersion(short apiKey) {
+        return apiKeyToApiVersion.get(apiKey);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public static ApiVersionsResponse parse(ByteBuffer buffer) {
+        return new ApiVersionsResponse(CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static ApiVersionsResponse fromError(Errors error) {
+        return new ApiVersionsResponse(error.code(), Collections.<ApiVersion>emptyList());
+    }
+
+    private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {
+        Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
+        for (ApiVersion apiVersion: apiVersions) {
+            tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion);
+        }
+        return tempApiIdToApiVersion;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 0018f53..345de3f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -95,7 +95,10 @@ public class RequestResponseTest {
                 createLeaderAndIsrResponse(),
                 createSaslHandshakeRequest(),
                 createSaslHandshakeRequest().getErrorResponse(0, new UnknownServerException()),
-                createSaslHandshakeResponse()
+                createSaslHandshakeResponse(),
+                createApiVersionRequest(),
+                createApiVersionRequest().getErrorResponse(0, new UnknownServerException()),
+                createApiVersionResponse()
         );
 
         for (AbstractRequestResponse req : requestResponseList)
@@ -438,4 +441,13 @@ public class RequestResponseTest {
     private AbstractRequestResponse createSaslHandshakeResponse() {
         return new SaslHandshakeResponse(Errors.NONE.code(), Collections.singletonList("GSSAPI"));
     }
-}
+
+    private AbstractRequest createApiVersionRequest() {
+        return new ApiVersionsRequest();
+    }
+
+    private AbstractRequestResponse createApiVersionResponse() {
+        List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
+        return new ApiVersionsResponse(Errors.NONE.code(), apiVersions);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 406b1bd..67d46fc 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import java.lang.{Long => JLong, Short => JShort}
 import java.util.Properties
 
-import kafka.admin.{RackAwareMode, AdminUtils}
+import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common
@@ -31,27 +31,32 @@ import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
 import kafka.log._
 import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.network._
-import kafka.network.RequestChannel.{Session, Response}
-import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
+import kafka.network.RequestChannel.{Response, Session}
+import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Topic, Write}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
-import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException,
-ClusterAuthorizationException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
-DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
-LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
-StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse,
-MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
+import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{TopicPartition, Node}
+import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.internals.TopicConstants
 
 import scala.collection._
 import scala.collection.JavaConverters._
 import org.apache.kafka.common.requests.SaslHandshakeResponse
 
+object KafkaApis {
+  val apiVersionsResponse = new ApiVersionsResponse(Errors.NONE.code, buildApiKeysToApiVersions.values.toList.asJava)
+
+  private def buildApiKeysToApiVersions: Map[Short, ApiVersionsResponse.ApiVersion] = {
+    ApiKeys.values.map(apiKey =>
+      apiKey.id -> new ApiVersionsResponse.ApiVersion(apiKey.id, Protocol.MIN_VERSIONS(apiKey.id), Protocol.CURR_VERSION(apiKey.id))).toMap
+  }
+}
+
+
 /**
  * Logic to handle the various Kafka requests
  */
@@ -74,7 +79,7 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Top-level method that handles all requests and multiplexes to the right api
    */
   def handle(request: RequestChannel.Request) {
-    try{
+    try {
       trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
         format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
       ApiKeys.forId(request.requestId) match {
@@ -96,6 +101,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
+        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -143,7 +149,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       val responseHeader = new ResponseHeader(correlationId)
-      val leaderAndIsrResponse=
+      val leaderAndIsrResponse =
         if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
           val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
           new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
@@ -234,7 +240,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
 
-      val (authorizedRequestInfo, unauthorizedRequestInfo) =  filteredRequestInfo.partition {
+      val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
         case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
       }
 
@@ -251,7 +257,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
 
         val responseHeader = new ResponseHeader(header.correlationId)
-        val responseBody =  new OffsetCommitResponse(combinedCommitStatus.asJava)
+        val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
         requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
       }
 
@@ -376,7 +382,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           val respHeader = new ResponseHeader(request.header.correlationId)
           val respBody = request.header.apiVersion match {
             case 0 => new ProduceResponse(mergedResponseStatus.asJava)
-            case version@ (1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
+            case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
             // This case shouldn't happen unless a new version of ProducerRequest is added without
             // updating this part of the code to handle it properly.
             case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
@@ -426,7 +432,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
 
-    val (authorizedRequestInfo, unauthorizedRequestInfo) =  fetchRequest.requestInfo.partition {
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
       case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
     }
 
@@ -552,14 +558,14 @@ class KafkaApis(val requestChannel: RequestChannel,
         case utpe: UnknownTopicOrPartitionException =>
           debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
                correlationId, clientId, topicPartition, utpe.getMessage))
-          (topicPartition,  new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava))
+          (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava))
         case nle: NotLeaderForPartitionException =>
           debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
                correlationId, clientId, topicPartition,nle.getMessage))
-          (topicPartition,  new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava))
+          (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava))
         case e: Throwable =>
           error("Error while responding to offset request", e)
-          (topicPartition,  new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
+          (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
       }
     })
 
@@ -591,7 +597,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     else
       offsetTimeArray = new Array[(Long, Long)](segsArray.length)
 
-    for(i <- 0 until segsArray.length)
+    for (i <- 0 until segsArray.length)
       offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
     if (segsArray.last.size > 0)
       offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
@@ -610,18 +616,18 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (offsetTimeArray(startIndex)._2 <= timestamp)
             isFound = true
           else
-            startIndex -=1
+            startIndex -= 1
         }
     }
 
     val retSize = maxNumOffsets.min(startIndex + 1)
     val ret = new Array[Long](retSize)
-    for(j <- 0 until retSize) {
+    for (j <- 0 until retSize) {
       ret(j) = offsetTimeArray(startIndex)._1
       startIndex -= 1
     }
     // ensure that the returned seq is in descending order of offsets
-    ret.toSeq.sortBy(- _)
+    ret.toSeq.sortBy(-_)
   }
 
   private def createTopic(topic: String,
@@ -871,7 +877,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
     } else {
       val (error, groups) = coordinator.handleListGroups()
-      val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
+      val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
       new ListGroupsResponse(error.code, allGroups.asJava)
     }
     requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
@@ -1024,6 +1030,23 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
   }
 
+  def handleApiVersionsRequest(request: RequestChannel.Request) {
+    // Note that broker returns its full list of supported ApiKeys and versions regardless of current
+    // authentication state (e.g., before SASL authentication on an SASL listener, do note that no
+    // Kafka protocol requests may take place on a SSL listener before the SSL handshake is finished).
+    // If this is considered to leak information about the broker version a workaround is to use SSL
+    // with client authentication which is performed at an earlier stage of the connection where the
+    // ApiVersionRequest is not available.
+    val responseHeader = new ResponseHeader(request.header.correlationId)
+    val isApiVersionsRequestVersionSupported = request.header.apiVersion <= Protocol.CURR_VERSION(ApiKeys.API_VERSIONS.id) &&
+                                              request.header.apiVersion >= Protocol.MIN_VERSIONS(ApiKeys.API_VERSIONS.id)
+    val responseBody = if (isApiVersionsRequestVersionSupported)
+      KafkaApis.apiVersionsResponse
+    else
+      ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+  }
+
   def close() {
     quotaManagers.foreach { case (apiKey, quotaManager) =>
       quotaManager.shutdown()
@@ -1035,4 +1058,4 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
new file mode 100644
index 0000000..4429f26
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
@@ -0,0 +1,51 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package unit.kafka.server
+
+import kafka.server.KafkaApis
+import org.apache.kafka.common.protocol.{Protocol, ApiKeys}
+import org.junit.Assert._
+import org.junit.Test
+
+class ApiVersionsTest {
+
+  @Test
+  def testApiVersions {
+    val apiVersions = KafkaApis.apiVersionsResponse.apiVersions
+    assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length)
+
+    for (key <- ApiKeys.values) {
+      val version = KafkaApis.apiVersionsResponse.apiVersion(key.id)
+      assertNotNull(s"Could not find ApiVersion for API ${key.name}", version)
+      assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, Protocol.MIN_VERSIONS(key.id))
+      assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, Protocol.CURR_VERSION(key.id))
+
+      // Check if versions less than min version are indeed set as null, i.e., deprecated.
+      for (i <- 0 until version.minVersion) {
+        assertNull(s"Request version $i for API ${version.apiKey} must be null.", Protocol.REQUESTS(version.apiKey)(i))
+        assertNull(s"Response version $i for API ${version.apiKey} must be null.", Protocol.RESPONSES(version.apiKey)(i))
+      }
+
+      // Check if versions between min and max versions are non null, i.e., valid.
+      for (i <- version.minVersion.toInt to version.maxVersion) {
+        assertNotNull(s"Request version $i for API ${version.apiKey} must not be null.", Protocol.REQUESTS(version.apiKey)(i))
+        assertNotNull(s"Response version $i for API ${version.apiKey} must not be null.", Protocol.RESPONSES(version.apiKey)(i))
+      }
+    }
+  }
+}
\ No newline at end of file