You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/05/05 01:16:13 UTC
kafka git commit: KAFKA-3652;
Return error response for unsupported version of ApiVersionsRequest
Repository: kafka
Updated Branches:
refs/heads/trunk c8c6ac3f6 -> 64451af9e
KAFKA-3652; Return error response for unsupported version of ApiVersionsRequest
Handle unsupported version of ApiVersionsRequest during SASL auth as well as normal operation by returning an error response.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #1310 from rajinisivaram/KAFKA-3652
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64451af9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64451af9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64451af9
Branch: refs/heads/trunk
Commit: 64451af9e08de428064dc232cd6dea0ea0b2a81d
Parents: c8c6ac3
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed May 4 18:16:08 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed May 4 18:16:08 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/common/protocol/Protocol.java | 4 +
.../authenticator/SaslServerAuthenticator.java | 77 ++++++++++++--------
.../authenticator/SaslAuthenticatorTest.java | 62 ++++++++++++++++
.../scala/kafka/network/RequestChannel.scala | 13 +++-
.../src/main/scala/kafka/server/KafkaApis.scala | 4 +-
.../kafka/server/ApiVersionsRequestTest.scala | 8 +-
.../server/SaslApiVersionsRequestTest.scala | 17 ++++-
7 files changed, 146 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 3644f9c..ec74427 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -846,6 +846,10 @@ public class Protocol {
}
}
+ public static boolean apiVersionSupported(short apiKey, short apiVersion) {
+ return apiKey < CURR_VERSION.length && apiVersion >= MIN_VERSIONS[apiKey] && apiVersion <= CURR_VERSION[apiKey];
+ }
+
private static String indentString(int size) {
StringBuilder b = new StringBuilder(size);
for (int i = 0; i < size; i++)
http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index a9c19a5..e1074a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkSend;
@@ -58,6 +59,7 @@ import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractRequestResponse;
@@ -75,7 +77,7 @@ public class SaslServerAuthenticator implements Authenticator {
private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
public enum SaslState {
- HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
+ GSSAPI_OR_HANDSHAKE_REQUEST, HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
}
private final String node;
@@ -85,7 +87,7 @@ public class SaslServerAuthenticator implements Authenticator {
private final String host;
// Current SASL state
- private SaslState saslState = SaslState.HANDSHAKE_REQUEST;
+ private SaslState saslState = SaslState.GSSAPI_OR_HANDSHAKE_REQUEST;
// Next SASL state to be set when outgoing writes associated with the current SASL state complete
private SaslState pendingSaslState = null;
private SaslServer saslServer;
@@ -215,6 +217,9 @@ public class SaslServerAuthenticator implements Authenticator {
try {
switch (saslState) {
case HANDSHAKE_REQUEST:
+ handleKafkaRequest(clientToken);
+ break;
+ case GSSAPI_OR_HANDSHAKE_REQUEST:
if (handleKafkaRequest(clientToken))
break;
// For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
@@ -288,39 +293,53 @@ public class SaslServerAuthenticator implements Authenticator {
try {
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
RequestHeader requestHeader = RequestHeader.parse(requestBuffer);
- AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), requestBuffer);
+ ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
+ // A valid Kafka request header was received. SASL authentication tokens are now expected only
+ // following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client.
+ setSaslState(SaslState.HANDSHAKE_REQUEST);
isKafkaRequest = true;
- ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
- LOG.debug("Handle Kafka request {}", apiKey);
- switch (apiKey) {
- case API_VERSIONS:
- handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
- break;
- case SASL_HANDSHAKE:
- clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
- break;
- default:
- throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
+ if (!Protocol.apiVersionSupported(requestHeader.apiKey(), requestHeader.apiVersion())) {
+ if (apiKey == ApiKeys.API_VERSIONS)
+ sendKafkaResponse(requestHeader, ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION));
+ else
+ throw new UnsupportedVersionException("Version " + requestHeader.apiVersion() + " is not supported for apiKey " + apiKey);
+ } else {
+ AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), requestBuffer);
+
+ LOG.debug("Handle Kafka request {}", apiKey);
+ switch (apiKey) {
+ case API_VERSIONS:
+ handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
+ break;
+ case SASL_HANDSHAKE:
+ clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
+ break;
+ default:
+ throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
+ }
}
} catch (SchemaException | IllegalArgumentException e) {
- // SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException is thrown
- // if the API key is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
- // starting with 0x60, revert to GSSAPI for both these exceptions.
- if (LOG.isDebugEnabled()) {
- StringBuilder tokenBuilder = new StringBuilder();
- for (byte b : requestBytes) {
- tokenBuilder.append(String.format("%02x", b));
- if (tokenBuilder.length() >= 20)
- break;
+ if (saslState == SaslState.GSSAPI_OR_HANDSHAKE_REQUEST) {
+ // SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException is thrown
+ // if the API key is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
+ // starting with 0x60, revert to GSSAPI for both these exceptions.
+ if (LOG.isDebugEnabled()) {
+ StringBuilder tokenBuilder = new StringBuilder();
+ for (byte b : requestBytes) {
+ tokenBuilder.append(String.format("%02x", b));
+ if (tokenBuilder.length() >= 20)
+ break;
+ }
+ LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
}
- LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
- }
- if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
- LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
- clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
+ if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
+ LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
+ clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
+ } else
+ throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
} else
- throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
+ throw e;
}
if (clientMechanism != null) {
createSaslServer(clientMechanism);
http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 368b5a7..97fe3d8 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.JaasUtils;
@@ -244,6 +245,62 @@ public class SaslAuthenticatorTest {
}
/**
+ * Tests that unsupported version of ApiVersionsRequest before SASL handshake request
+ * returns error response and does not result in authentication failure. This test
+ * is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+ * where a non-SASL client is used to send requests that are processed by
+ * {@link SaslServerAuthenticator} of the server prior to client authentication.
+ */
+ @Test
+ public void testApiVersionsRequestWithUnsupportedVersion() throws Exception {
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+ configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+ server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+
+ // Send ApiVersionsRequest with unsupported version and validate error response.
+ String node = "1";
+ createClientConnection(SecurityProtocol.PLAINTEXT, node);
+ RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, Short.MAX_VALUE, "someclient", 1);
+ selector.send(new NetworkSend(node, RequestSend.serialize(header, new ApiVersionsRequest().toStruct())));
+ ByteBuffer responseBuffer = waitForResponse();
+ ResponseHeader.parse(responseBuffer);
+ ApiVersionsResponse response = ApiVersionsResponse.parse(responseBuffer);
+ assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.errorCode());
+
+ // Send ApiVersionsRequest with a supported version. This should succeed.
+ sendVersionRequestReceiveResponse(node);
+
+ // Test that client can authenticate successfully
+ sendHandshakeRequestReceiveResponse(node);
+ authenticateUsingSaslPlainAndCheckConnection(node);
+ }
+
+ /**
+ * Tests that unsupported version of SASL handshake request returns error
+ * response and fails authentication. This test is similar to
+ * {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+ * where a non-SASL client is used to send requests that are processed by
+ * {@link SaslServerAuthenticator} of the server prior to client authentication.
+ */
+ @Test
+ public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception {
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+ configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+ server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+
+ // Send ApiVersionsRequest and validate error response.
+ String node1 = "invalid1";
+ createClientConnection(SecurityProtocol.PLAINTEXT, node1);
+ RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE, "someclient", 2);
+ selector.send(new NetworkSend(node1, RequestSend.serialize(header, new SaslHandshakeRequest("PLAIN").toStruct())));
+ NetworkTestUtils.waitForChannelClose(selector, node1);
+ selector.close();
+
+ // Test good connection still works
+ createAndCheckClientConnection(securityProtocol, "good1");
+ }
+
+ /**
* Tests that any invalid data during Kafka SASL handshake request flow
* or the actual SASL authentication flow result in authentication failure
* and do not cause any failures in the server.
@@ -485,6 +542,11 @@ public class SaslAuthenticatorTest {
SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node);
assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms());
+ // Complete manual authentication and check send/receive succeed
+ authenticateUsingSaslPlainAndCheckConnection(node);
+ }
+
+ private void authenticateUsingSaslPlainAndCheckConnection(String node) throws Exception {
// Authenticate using PLAIN username/password
String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + TestJaasConfig.PASSWORD;
selector.send(new NetworkSend(node, ByteBuffer.wrap(authString.getBytes("UTF-8"))));
http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 17c5b9b..e2000db 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -28,8 +28,8 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{Logging, SystemTime}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader}
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol, Protocol}
+import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader, ApiVersionsRequest}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.log4j.Logger
@@ -84,8 +84,13 @@ object RequestChannel extends Logging {
null
val body: AbstractRequest =
if (requestObj == null)
- try AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
- catch {
+ try {
+ // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
+ if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
+ new ApiVersionsRequest
+ else
+ AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
+ } catch {
case ex: Throwable =>
throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index eb6358d..086bd4b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1029,9 +1029,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
val responseHeader = new ResponseHeader(request.header.correlationId)
- val isApiVersionsRequestVersionSupported = request.header.apiVersion <= Protocol.CURR_VERSION(ApiKeys.API_VERSIONS.id) &&
- request.header.apiVersion >= Protocol.MIN_VERSIONS(ApiKeys.API_VERSIONS.id)
- val responseBody = if (isApiVersionsRequestVersionSupported)
+ val responseBody = if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
ApiVersionsResponse.apiVersionsResponse
else
ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)
http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 8bf4d73..f2dd60f 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -17,7 +17,7 @@
package kafka.server
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.junit.Assert._
@@ -48,6 +48,12 @@ class ApiVersionsRequestTest extends BaseRequestTest {
ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse)
}
+ @Test
+ def testApiVersionsRequestWithUnsupportedVersion() {
+ val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, Short.MaxValue)
+ assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
+ }
+
private def sendApiVersionsRequest(request: ApiVersionsRequest, version: Short): ApiVersionsResponse = {
val response = send(request, ApiKeys.API_VERSIONS, version)
ApiVersionsResponse.parse(response)
http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 632665a..8557008 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -19,11 +19,10 @@ package kafka.server
import java.io.IOException
import java.net.Socket
import java.util.Collections
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.apache.kafka.common.requests.SaslHandshakeRequest
import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.apache.kafka.common.protocol.Errors
import org.junit.Test
import org.junit.Assert._
import kafka.api.SaslTestHarness
@@ -64,6 +63,20 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
}
}
+ @Test
+ def testApiVersionsRequestWithUnsupportedVersion() {
+ val plaintextSocket = connect(protocol = securityProtocol)
+ try {
+ val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, Short.MaxValue)
+ assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
+ val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0)
+ ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse2)
+ sendSaslHandshakeRequestValidateResponse(plaintextSocket)
+ } finally {
+ plaintextSocket.close()
+ }
+ }
+
private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest, version: Short): ApiVersionsResponse = {
val response = send(socket, request, ApiKeys.API_VERSIONS, version)
ApiVersionsResponse.parse(response)