You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/27 20:28:37 UTC
kafka git commit: KAFKA-3307;
Add ApiVersions Request/Response and server side handling.
Repository: kafka
Updated Branches:
refs/heads/trunk 4c76b5fa6 -> 8407dac6e
KAFKA-3307; Add ApiVersions Request/Response and server side handling.
The patch does the following.
1. Adds ApiVersionsRequest/Response.
2. Adds UNSUPPORTED_VERSION error and UnsupportedVersionException.
3. Adds broker side handling of ApiVersionsRequest.
Author: Ashish Singh <as...@cloudera.com>
Reviewers: Gwen Shapira, Ismael Juma, Magnus Edenhill
Closes #986 from SinghAsDev/KAFKA-3307
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8407dac6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8407dac6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8407dac6
Branch: refs/heads/trunk
Commit: 8407dac6ee409d832c95533e6f1d5578511232ae
Parents: 4c76b5f
Author: Ashish Singh <as...@cloudera.com>
Authored: Wed Apr 27 11:28:32 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Apr 27 11:28:32 2016 -0700
----------------------------------------------------------------------
.../errors/UnsupportedVersionException.java | 25 ++++
.../apache/kafka/common/protocol/ApiKeys.java | 5 +-
.../apache/kafka/common/protocol/Errors.java | 5 +-
.../apache/kafka/common/protocol/Protocol.java | 41 ++++++-
.../kafka/common/requests/AbstractRequest.java | 4 +-
.../common/requests/ApiVersionsRequest.java | 55 +++++++++
.../common/requests/ApiVersionsResponse.java | 116 +++++++++++++++++++
.../common/requests/RequestResponseTest.java | 16 ++-
.../src/main/scala/kafka/server/KafkaApis.scala | 77 +++++++-----
.../unit/kafka/server/ApiVersionsTest.scala | 51 ++++++++
10 files changed, 357 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
new file mode 100644
index 0000000..3679be4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class UnsupportedVersionException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public UnsupportedVersionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public UnsupportedVersionException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 512a121..aeb0b45 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -37,7 +37,8 @@ public enum ApiKeys {
SYNC_GROUP(14, "SyncGroup"),
DESCRIBE_GROUPS(15, "DescribeGroups"),
LIST_GROUPS(16, "ListGroups"),
- SASL_HANDSHAKE(17, "SaslHandshake");
+ SASL_HANDSHAKE(17, "SaslHandshake"),
+ API_VERSIONS(18, "ApiVersions");
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
@@ -97,4 +98,4 @@ public enum ApiKeys {
System.out.println(toHtml());
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 9013399..64a709e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -136,7 +137,9 @@ public enum Errors {
UNSUPPORTED_SASL_MECHANISM(33,
new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")),
ILLEGAL_SASL_STATE(34,
- new IllegalSaslStateException("Request is not valid given the current SASL state."));
+ new IllegalSaslStateException("Request is not valid given the current SASL state.")),
+ UNSUPPORTED_VERSION(35,
+ new UnsupportedVersionException("The version of API is not supported."));
private static final Logger log = LoggerFactory.getLogger(Errors.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index d322095..99cdbf9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -759,10 +759,24 @@ public class Protocol {
public static final Schema[] SASL_HANDSHAKE_REQUEST = new Schema[] {SASL_HANDSHAKE_REQUEST_V0};
public static final Schema[] SASL_HANDSHAKE_RESPONSE = new Schema[] {SASL_HANDSHAKE_RESPONSE_V0};
+ /* ApiVersion api */
+ public static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
+
+ public static final Schema API_VERSIONS_V0 = new Schema(new Field("api_key", INT16, "API key."),
+ new Field("min_version", INT16, "Minimum supported version."),
+ new Field("max_version", INT16, "Maximum supported version."));
+
+ public static final Schema API_VERSIONS_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."),
+ new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."));
+
+ public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0};
+ public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0};
+
/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
+ public static final short[] MIN_VERSIONS = new short[ApiKeys.MAX_API_KEY + 1];
/* the latest version of each api */
public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
@@ -786,6 +800,7 @@ public class Protocol {
REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST;
REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST;
REQUESTS[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_REQUEST;
+ REQUESTS[ApiKeys.API_VERSIONS.id] = API_VERSIONS_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -805,16 +820,32 @@ public class Protocol {
RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE;
RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE;
RESPONSES[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_RESPONSE;
+ RESPONSES[ApiKeys.API_VERSIONS.id] = API_VERSIONS_RESPONSE;
- /* set the maximum version of each api */
- for (ApiKeys api : ApiKeys.values())
+ /* set the minimum and maximum version of each api */
+ for (ApiKeys api : ApiKeys.values()) {
CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
+ for (int i = 0; i < REQUESTS[api.id].length; ++i)
+ if (REQUESTS[api.id][i] != null) {
+ MIN_VERSIONS[api.id] = (short) i;
+ break;
+ }
+ }
- /* sanity check that we have the same number of request and response versions for each api */
- for (ApiKeys api : ApiKeys.values())
+ /* sanity check that:
+ * - we have the same number of request and response versions for each api
+ * - we have a consistent set of request and response versions for each api */
+ for (ApiKeys api : ApiKeys.values()) {
if (REQUESTS[api.id].length != RESPONSES[api.id].length)
throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name
+ " but " + RESPONSES[api.id].length + " response versions.");
+
+ for (int i = 0; i < REQUESTS[api.id].length; ++i)
+ if ((REQUESTS[api.id][i] == null && RESPONSES[api.id][i] != null) ||
+ (REQUESTS[api.id][i] != null && RESPONSES[api.id][i] == null))
+ throw new IllegalStateException("Request and response for version " + i + " of API "
+ + api.id + " are defined inconsistently. One is null while the other is not null.");
+ }
}
private static String indentString(int size) {
@@ -977,4 +1008,4 @@ public class Protocol {
System.out.println(toHtml());
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 89c2ce1..ab61c66 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -74,9 +74,11 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return ListGroupsRequest.parse(buffer, versionId);
case SASL_HANDSHAKE:
return SaslHandshakeRequest.parse(buffer, versionId);
+ case API_VERSIONS:
+ return ApiVersionsRequest.parse(buffer, versionId);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
new file mode 100644
index 0000000..b78c759
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class ApiVersionsRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.API_VERSIONS.id);
+
+ public ApiVersionsRequest() {
+ super(new Struct(CURRENT_SCHEMA));
+ }
+
+ public ApiVersionsRequest(Struct struct) {
+ super(struct);
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ short errorCode = Errors.forException(e).code();
+ return new ApiVersionsResponse(errorCode, Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.API_VERSIONS.id)));
+ }
+ }
+
+ public static ApiVersionsRequest parse(ByteBuffer buffer, int versionId) {
+ return new ApiVersionsRequest(ProtoUtils.parseRequest(ApiKeys.API_VERSIONS.id, versionId, buffer));
+ }
+
+ public static ApiVersionsRequest parse(ByteBuffer buffer) {
+ return new ApiVersionsRequest(CURRENT_SCHEMA.read(buffer));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
new file mode 100644
index 0000000..36881a3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ApiVersionsResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id);
+
+ public static final String ERROR_CODE_KEY_NAME = "error_code";
+ public static final String API_VERSIONS_KEY_NAME = "api_versions";
+ public static final String API_KEY_NAME = "api_key";
+ public static final String MIN_VERSION_KEY_NAME = "min_version";
+ public static final String MAX_VERSION_KEY_NAME = "max_version";
+
+ /**
+ * Possible error codes:
+ *
+ * UNSUPPORTED_VERSION (33)
+ */
+ private final short errorCode;
+ private final Map<Short, ApiVersion> apiKeyToApiVersion;
+
+ public static final class ApiVersion {
+ public final short apiKey;
+ public final short minVersion;
+ public final short maxVersion;
+
+ public ApiVersion(short apiKey, short minVersion, short maxVersion) {
+ this.apiKey = apiKey;
+ this.minVersion = minVersion;
+ this.maxVersion = maxVersion;
+ }
+ }
+
+ public ApiVersionsResponse(short errorCode, List<ApiVersion> apiVersions) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ List<Struct> apiVersionList = new ArrayList<>();
+ for (ApiVersion apiVersion : apiVersions) {
+ Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME);
+ apiVersionStruct.set(API_KEY_NAME, apiVersion.apiKey);
+ apiVersionStruct.set(MIN_VERSION_KEY_NAME, apiVersion.minVersion);
+ apiVersionStruct.set(MAX_VERSION_KEY_NAME, apiVersion.maxVersion);
+ apiVersionList.add(apiVersionStruct);
+ }
+ struct.set(API_VERSIONS_KEY_NAME, apiVersionList.toArray());
+ this.errorCode = errorCode;
+ this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions);
+ }
+
+ public ApiVersionsResponse(Struct struct) {
+ super(struct);
+ this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ List<ApiVersion> tempApiVersions = new ArrayList<>();
+ for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) {
+ Struct apiVersionStruct = (Struct) apiVersionsObj;
+ short apiKey = apiVersionStruct.getShort(API_KEY_NAME);
+ short minVersion = apiVersionStruct.getShort(MIN_VERSION_KEY_NAME);
+ short maxVersion = apiVersionStruct.getShort(MAX_VERSION_KEY_NAME);
+ tempApiVersions.add(new ApiVersion(apiKey, minVersion, maxVersion));
+ }
+ this.apiKeyToApiVersion = buildApiKeyToApiVersion(tempApiVersions);
+ }
+
+ public Collection<ApiVersion> apiVersions() {
+ return apiKeyToApiVersion.values();
+ }
+
+ public ApiVersion apiVersion(short apiKey) {
+ return apiKeyToApiVersion.get(apiKey);
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public static ApiVersionsResponse parse(ByteBuffer buffer) {
+ return new ApiVersionsResponse(CURRENT_SCHEMA.read(buffer));
+ }
+
+ public static ApiVersionsResponse fromError(Errors error) {
+ return new ApiVersionsResponse(error.code(), Collections.<ApiVersion>emptyList());
+ }
+
+ private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) {
+ Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>();
+ for (ApiVersion apiVersion: apiVersions) {
+ tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion);
+ }
+ return tempApiIdToApiVersion;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 0018f53..345de3f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -95,7 +95,10 @@ public class RequestResponseTest {
createLeaderAndIsrResponse(),
createSaslHandshakeRequest(),
createSaslHandshakeRequest().getErrorResponse(0, new UnknownServerException()),
- createSaslHandshakeResponse()
+ createSaslHandshakeResponse(),
+ createApiVersionRequest(),
+ createApiVersionRequest().getErrorResponse(0, new UnknownServerException()),
+ createApiVersionResponse()
);
for (AbstractRequestResponse req : requestResponseList)
@@ -438,4 +441,13 @@ public class RequestResponseTest {
private AbstractRequestResponse createSaslHandshakeResponse() {
return new SaslHandshakeResponse(Errors.NONE.code(), Collections.singletonList("GSSAPI"));
}
-}
+
+ private AbstractRequest createApiVersionRequest() {
+ return new ApiVersionsRequest();
+ }
+
+ private AbstractRequestResponse createApiVersionResponse() {
+ List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
+ return new ApiVersionsResponse(Errors.NONE.code(), apiVersions);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 406b1bd..67d46fc 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import java.lang.{Long => JLong, Short => JShort}
import java.util.Properties
-import kafka.admin.{RackAwareMode, AdminUtils}
+import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api._
import kafka.cluster.Partition
import kafka.common
@@ -31,27 +31,32 @@ import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
import kafka.log._
import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
import kafka.network._
-import kafka.network.RequestChannel.{Session, Response}
-import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
+import kafka.network.RequestChannel.{Response, Session}
+import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Topic, Write}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
-import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException,
-ClusterAuthorizationException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
-DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
-LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
-StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse,
-MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
+import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{TopicPartition, Node}
+import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.internals.TopicConstants
import scala.collection._
import scala.collection.JavaConverters._
import org.apache.kafka.common.requests.SaslHandshakeResponse
+object KafkaApis {
+ val apiVersionsResponse = new ApiVersionsResponse(Errors.NONE.code, buildApiKeysToApiVersions.values.toList.asJava)
+
+ private def buildApiKeysToApiVersions: Map[Short, ApiVersionsResponse.ApiVersion] = {
+ ApiKeys.values.map(apiKey =>
+ apiKey.id -> new ApiVersionsResponse.ApiVersion(apiKey.id, Protocol.MIN_VERSIONS(apiKey.id), Protocol.CURR_VERSION(apiKey.id))).toMap
+ }
+}
+
+
/**
* Logic to handle the various Kafka requests
*/
@@ -74,7 +79,7 @@ class KafkaApis(val requestChannel: RequestChannel,
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
- try{
+ try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {
@@ -96,6 +101,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
+ case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -143,7 +149,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val responseHeader = new ResponseHeader(correlationId)
- val leaderAndIsrResponse=
+ val leaderAndIsrResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
@@ -234,7 +240,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
- val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
+ val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
}
@@ -251,7 +257,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
val responseHeader = new ResponseHeader(header.correlationId)
- val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
+ val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
@@ -376,7 +382,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val respHeader = new ResponseHeader(request.header.correlationId)
val respBody = request.header.apiVersion match {
case 0 => new ProduceResponse(mergedResponseStatus.asJava)
- case version@ (1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
+ case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
// This case shouldn't happen unless a new version of ProducerRequest is added without
// updating this part of the code to handle it properly.
case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
@@ -426,7 +432,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
- val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
+ val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
}
@@ -552,14 +558,14 @@ class KafkaApis(val requestChannel: RequestChannel,
case utpe: UnknownTopicOrPartitionException =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition, utpe.getMessage))
- (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava))
+ (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava))
case nle: NotLeaderForPartitionException =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition,nle.getMessage))
- (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava))
+ (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava))
case e: Throwable =>
error("Error while responding to offset request", e)
- (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
+ (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
}
})
@@ -591,7 +597,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else
offsetTimeArray = new Array[(Long, Long)](segsArray.length)
- for(i <- 0 until segsArray.length)
+ for (i <- 0 until segsArray.length)
offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
if (segsArray.last.size > 0)
offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
@@ -610,18 +616,18 @@ class KafkaApis(val requestChannel: RequestChannel,
if (offsetTimeArray(startIndex)._2 <= timestamp)
isFound = true
else
- startIndex -=1
+ startIndex -= 1
}
}
val retSize = maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
- for(j <- 0 until retSize) {
+ for (j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
// ensure that the returned seq is in descending order of offsets
- ret.toSeq.sortBy(- _)
+ ret.toSeq.sortBy(-_)
}
private def createTopic(topic: String,
@@ -871,7 +877,7 @@ class KafkaApis(val requestChannel: RequestChannel,
ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
} else {
val (error, groups) = coordinator.handleListGroups()
- val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
+ val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
new ListGroupsResponse(error.code, allGroups.asJava)
}
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
@@ -1024,6 +1030,23 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
}
+ def handleApiVersionsRequest(request: RequestChannel.Request) {
+ // Note that broker returns its full list of supported ApiKeys and versions regardless of current
+ // authentication state (e.g., before SASL authentication on an SASL listener, do note that no
+ // Kafka protocol requests may take place on a SSL listener before the SSL handshake is finished).
+ // If this is considered to leak information about the broker version a workaround is to use SSL
+ // with client authentication which is performed at an earlier stage of the connection where the
+ // ApiVersionRequest is not available.
+ val responseHeader = new ResponseHeader(request.header.correlationId)
+ val isApiVersionsRequestVersionSupported = request.header.apiVersion <= Protocol.CURR_VERSION(ApiKeys.API_VERSIONS.id) &&
+ request.header.apiVersion >= Protocol.MIN_VERSIONS(ApiKeys.API_VERSIONS.id)
+ val responseBody = if (isApiVersionsRequestVersionSupported)
+ KafkaApis.apiVersionsResponse
+ else
+ ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+ }
+
def close() {
quotaManagers.foreach { case (apiKey, quotaManager) =>
quotaManager.shutdown()
@@ -1035,4 +1058,4 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
new file mode 100644
index 0000000..4429f26
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+import kafka.server.KafkaApis
+import org.apache.kafka.common.protocol.{Protocol, ApiKeys}
+import org.junit.Assert._
+import org.junit.Test
+
+class ApiVersionsTest {
+
+ @Test
+ def testApiVersions {
+ val apiVersions = KafkaApis.apiVersionsResponse.apiVersions
+ assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length)
+
+ for (key <- ApiKeys.values) {
+ val version = KafkaApis.apiVersionsResponse.apiVersion(key.id)
+ assertNotNull(s"Could not find ApiVersion for API ${key.name}", version)
+ assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, Protocol.MIN_VERSIONS(key.id))
+ assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, Protocol.CURR_VERSION(key.id))
+
+ // Check if versions less than min version are indeed set as null, i.e., deprecated.
+ for (i <- 0 until version.minVersion) {
+ assertNull(s"Request version $i for API ${version.apiKey} must be null.", Protocol.REQUESTS(version.apiKey)(i))
+ assertNull(s"Response version $i for API ${version.apiKey} must be null.", Protocol.RESPONSES(version.apiKey)(i))
+ }
+
+ // Check if versions between min and max versions are non null, i.e., valid.
+ for (i <- version.minVersion.toInt to version.maxVersion) {
+ assertNotNull(s"Request version $i for API ${version.apiKey} must not be null.", Protocol.REQUESTS(version.apiKey)(i))
+ assertNotNull(s"Response version $i for API ${version.apiKey} must not be null.", Protocol.RESPONSES(version.apiKey)(i))
+ }
+ }
+ }
+}
\ No newline at end of file