You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/05/05 01:16:13 UTC

kafka git commit: KAFKA-3652; Return error response for unsupported version of ApiVersionsRequest

Repository: kafka
Updated Branches:
  refs/heads/trunk c8c6ac3f6 -> 64451af9e


KAFKA-3652; Return error response for unsupported version of ApiVersionsRequest

Handle unsupported version of ApiVersionsRequest during SASL auth as well as normal operation by returning an error response.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #1310 from rajinisivaram/KAFKA-3652


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64451af9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64451af9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64451af9

Branch: refs/heads/trunk
Commit: 64451af9e08de428064dc232cd6dea0ea0b2a81d
Parents: c8c6ac3
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed May 4 18:16:08 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed May 4 18:16:08 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  |  4 +
 .../authenticator/SaslServerAuthenticator.java  | 77 ++++++++++++--------
 .../authenticator/SaslAuthenticatorTest.java    | 62 ++++++++++++++++
 .../scala/kafka/network/RequestChannel.scala    | 13 +++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  4 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |  8 +-
 .../server/SaslApiVersionsRequestTest.scala     | 17 ++++-
 7 files changed, 146 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 3644f9c..ec74427 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -846,6 +846,10 @@ public class Protocol {
         }
     }
 
+    public static boolean apiVersionSupported(short apiKey, short apiVersion) {
+        return apiKey < CURR_VERSION.length && apiVersion >= MIN_VERSIONS[apiKey] && apiVersion <= CURR_VERSION[apiKey];
+    }
+
     private static String indentString(int size) {
         StringBuilder b = new StringBuilder(size);
         for (int i = 0; i < size; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index a9c19a5..e1074a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkSend;
@@ -58,6 +59,7 @@ import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractRequestResponse;
@@ -75,7 +77,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
 
     public enum SaslState {
-        HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
+        GSSAPI_OR_HANDSHAKE_REQUEST, HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
     }
 
     private final String node;
@@ -85,7 +87,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private final String host;
 
     // Current SASL state
-    private SaslState saslState = SaslState.HANDSHAKE_REQUEST;
+    private SaslState saslState = SaslState.GSSAPI_OR_HANDSHAKE_REQUEST;
     // Next SASL state to be set when outgoing writes associated with the current SASL state complete
     private SaslState pendingSaslState = null;
     private SaslServer saslServer;
@@ -215,6 +217,9 @@ public class SaslServerAuthenticator implements Authenticator {
             try {
                 switch (saslState) {
                     case HANDSHAKE_REQUEST:
+                        handleKafkaRequest(clientToken);
+                        break;
+                    case GSSAPI_OR_HANDSHAKE_REQUEST:
                         if (handleKafkaRequest(clientToken))
                             break;
                         // For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
@@ -288,39 +293,53 @@ public class SaslServerAuthenticator implements Authenticator {
         try {
             ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
             RequestHeader requestHeader = RequestHeader.parse(requestBuffer);
-            AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), requestBuffer);
+            ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
+            // A valid Kafka request header was received. SASL authentication tokens are now expected only
+            // following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client.
+            setSaslState(SaslState.HANDSHAKE_REQUEST);
             isKafkaRequest = true;
 
-            ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
-            LOG.debug("Handle Kafka request {}", apiKey);
-            switch (apiKey) {
-                case API_VERSIONS:
-                    handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
-                    break;
-                case SASL_HANDSHAKE:
-                    clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
-                    break;
-                default:
-                    throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
+            if (!Protocol.apiVersionSupported(requestHeader.apiKey(), requestHeader.apiVersion())) {
+                if (apiKey == ApiKeys.API_VERSIONS)
+                    sendKafkaResponse(requestHeader, ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION));
+                else
+                    throw new UnsupportedVersionException("Version " + requestHeader.apiVersion() + " is not supported for apiKey " + apiKey);
+            } else {
+                AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), requestBuffer);
+
+                LOG.debug("Handle Kafka request {}", apiKey);
+                switch (apiKey) {
+                    case API_VERSIONS:
+                        handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
+                        break;
+                    case SASL_HANDSHAKE:
+                        clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
+                        break;
+                    default:
+                        throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
+                }
             }
         } catch (SchemaException | IllegalArgumentException e) {
-            // SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException is thrown
-            // if the API key is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
-            // starting with 0x60, revert to GSSAPI for both these exceptions.
-            if (LOG.isDebugEnabled()) {
-                StringBuilder tokenBuilder = new StringBuilder();
-                for (byte b : requestBytes) {
-                    tokenBuilder.append(String.format("%02x", b));
-                    if (tokenBuilder.length() >= 20)
-                         break;
+            if (saslState == SaslState.GSSAPI_OR_HANDSHAKE_REQUEST) {
+                // SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException is thrown
+                // if the API key is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
+                // starting with 0x60, revert to GSSAPI for both these exceptions.
+                if (LOG.isDebugEnabled()) {
+                    StringBuilder tokenBuilder = new StringBuilder();
+                    for (byte b : requestBytes) {
+                        tokenBuilder.append(String.format("%02x", b));
+                        if (tokenBuilder.length() >= 20)
+                             break;
+                    }
+                    LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
                 }
-                LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
-            }
-            if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
-                LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
-                clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
+                if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
+                    LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
+                    clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
+                } else
+                    throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
             } else
-                throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
+                throw e;
         }
         if (clientMechanism != null) {
             createSaslServer(clientMechanism);

http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 368b5a7..97fe3d8 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasUtils;
@@ -244,6 +245,62 @@ public class SaslAuthenticatorTest {
     }
 
     /**
+     * Tests that unsupported version of ApiVersionsRequest before SASL handshake request
+     * returns error response and does not result in authentication failure. This test
+     * is similar to {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * where a non-SASL client is used to send requests that are processed by
+     * {@link SaslServerAuthenticator} of the server prior to client authentication.
+     */
+    @Test
+    public void testApiVersionsRequestWithUnsupportedVersion() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+
+        // Send ApiVersionsRequest with unsupported version and validate error response.
+        String node = "1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node);
+        RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, Short.MAX_VALUE, "someclient", 1);
+        selector.send(new NetworkSend(node, RequestSend.serialize(header, new ApiVersionsRequest().toStruct())));
+        ByteBuffer responseBuffer = waitForResponse();
+        ResponseHeader.parse(responseBuffer);
+        ApiVersionsResponse response = ApiVersionsResponse.parse(responseBuffer);
+        assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.errorCode());
+
+        // Send ApiVersionsRequest with a supported version. This should succeed.
+        sendVersionRequestReceiveResponse(node);
+
+        // Test that client can authenticate successfully
+        sendHandshakeRequestReceiveResponse(node);
+        authenticateUsingSaslPlainAndCheckConnection(node);
+    }
+
+    /**
+     * Tests that unsupported version of SASL handshake request returns error
+     * response and fails authentication. This test is similar to
+     * {@link #testUnauthenticatedApiVersionsRequest(SecurityProtocol)}
+     * where a non-SASL client is used to send requests that are processed by
+     * {@link SaslServerAuthenticator} of the server prior to client authentication.
+     */
+    @Test
+    public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+        server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
+
+        // Send ApiVersionsRequest and validate error response.
+        String node1 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node1);
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE, "someclient", 2);
+        selector.send(new NetworkSend(node1, RequestSend.serialize(header, new SaslHandshakeRequest("PLAIN").toStruct())));
+        NetworkTestUtils.waitForChannelClose(selector, node1);
+        selector.close();
+
+        // Test good connection still works
+        createAndCheckClientConnection(securityProtocol, "good1");
+    }
+
+    /**
      * Tests that any invalid data during Kafka SASL handshake request flow
      * or the actual SASL authentication flow result in authentication failure
      * and do not cause any failures in the server.
@@ -485,6 +542,11 @@ public class SaslAuthenticatorTest {
         SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node);
         assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms());
 
+        // Complete manual authentication and check send/receive succeed
+        authenticateUsingSaslPlainAndCheckConnection(node);
+    }
+
+    private void authenticateUsingSaslPlainAndCheckConnection(String node) throws Exception {
         // Authenticate using PLAIN username/password
         String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + TestJaasConfig.PASSWORD;
         selector.send(new NetworkSend(node, ByteBuffer.wrap(authString.getBytes("UTF-8"))));

http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 17c5b9b..e2000db 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -28,8 +28,8 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{Logging, SystemTime}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader}
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol, Protocol}
+import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader, ApiVersionsRequest}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.log4j.Logger
 
@@ -84,8 +84,13 @@ object RequestChannel extends Logging {
         null
     val body: AbstractRequest =
       if (requestObj == null)
-        try AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
-        catch {
+        try {
+          // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
+          if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
+            new ApiVersionsRequest
+          else
+            AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
+        } catch {
           case ex: Throwable =>
             throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index eb6358d..086bd4b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1029,9 +1029,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
     val responseHeader = new ResponseHeader(request.header.correlationId)
-    val isApiVersionsRequestVersionSupported = request.header.apiVersion <= Protocol.CURR_VERSION(ApiKeys.API_VERSIONS.id) &&
-                                              request.header.apiVersion >= Protocol.MIN_VERSIONS(ApiKeys.API_VERSIONS.id)
-    val responseBody = if (isApiVersionsRequestVersionSupported)
+    val responseBody = if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
       ApiVersionsResponse.apiVersionsResponse
     else
       ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)

http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 8bf4d73..f2dd60f 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
 import org.junit.Assert._
@@ -48,6 +48,12 @@ class ApiVersionsRequestTest extends BaseRequestTest {
     ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse)
   }
 
+  @Test
+  def testApiVersionsRequestWithUnsupportedVersion() {
+    val apiVersionsResponse = sendApiVersionsRequest(new ApiVersionsRequest, Short.MaxValue)
+    assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
+  }
+
   private def sendApiVersionsRequest(request: ApiVersionsRequest, version: Short): ApiVersionsResponse = {
     val response = send(request, ApiKeys.API_VERSIONS, version)
     ApiVersionsResponse.parse(response)

http://git-wip-us.apache.org/repos/asf/kafka/blob/64451af9/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 632665a..8557008 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -19,11 +19,10 @@ package kafka.server
 import java.io.IOException
 import java.net.Socket
 import java.util.Collections
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
 import org.apache.kafka.common.requests.SaslHandshakeRequest
 import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.apache.kafka.common.protocol.Errors
 import org.junit.Test
 import org.junit.Assert._
 import kafka.api.SaslTestHarness
@@ -64,6 +63,20 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
     }
   }
 
+  @Test
+  def testApiVersionsRequestWithUnsupportedVersion() {
+    val plaintextSocket = connect(protocol = securityProtocol)
+    try {
+      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, Short.MaxValue)
+      assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
+      val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0)
+      ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse2)
+      sendSaslHandshakeRequestValidateResponse(plaintextSocket)
+    } finally {
+      plaintextSocket.close()
+    }
+  }
+
   private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest, version: Short): ApiVersionsResponse = {
     val response = send(socket, request, ApiKeys.API_VERSIONS, version)
     ApiVersionsResponse.parse(response)