You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/04/16 12:32:44 UTC

[kafka] branch 2.5 updated: KAFKA-9854 Re-authenticating causes mismatched parse of response (#8471)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new c5fb266  KAFKA-9854 Re-authenticating causes mismatched parse of response (#8471)
c5fb266 is described below

commit c5fb266df1c3c5626e9d4221120e99e03d4d07f4
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu Apr 16 20:26:30 2020 +0800

    KAFKA-9854 Re-authenticating causes mismatched parse of response (#8471)
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>, Ron Dagostino <rd...@confluent.io>
---
 .../org/apache/kafka/clients/NetworkClient.java    | 20 ++++++++-
 .../authenticator/SaslClientAuthenticator.java     | 41 ++++++++++++++++-
 .../apache/kafka/clients/NetworkClientTest.java    | 14 ++++++
 .../authenticator/SaslAuthenticatorTest.java       | 52 ++++++++++++++++++++++
 4 files changed, 123 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4a728ba..cd3a4bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -938,9 +939,15 @@ public class NetworkClient implements KafkaClient {
      * Validate that the response corresponds to the request we expect or else explode
      */
     private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
-        if (requestHeader.correlationId() != responseHeader.correlationId())
+        if (requestHeader.correlationId() != responseHeader.correlationId()) {
+            if (SaslClientAuthenticator.isReserved(requestHeader.correlationId())
+                    && !SaslClientAuthenticator.isReserved(responseHeader.correlationId()))
+                throw new SchemaException("the response is unrelated to Sasl request since its correlation id is " + responseHeader.correlationId()
+                    + " and the reserved range for Sasl request is [ "
+                    + SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID + "," + SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID + "]");
             throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
                     + ") does not match request (" + requestHeader.correlationId() + "), request header: " + requestHeader);
+        }
     }
 
     /**
@@ -1155,6 +1162,15 @@ public class NetworkClient implements KafkaClient {
         return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, defaultRequestTimeoutMs, null);
     }
 
+    // visible for testing
+    int nextCorrelationId() {
+        if (SaslClientAuthenticator.isReserved(correlation)) {
+            // the numeric overflow is fine as negative values is acceptable
+            correlation = SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID + 1;
+        }
+        return correlation++;
+    }
+
     @Override
     public ClientRequest newClientRequest(String nodeId,
                                           AbstractRequest.Builder<?> requestBuilder,
@@ -1162,7 +1178,7 @@ public class NetworkClient implements KafkaClient {
                                           boolean expectResponse,
                                           int requestTimeoutMs,
                                           RequestCompletionHandler callback) {
-        return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, createdTimeMs, expectResponse,
+        return new ClientRequest(nodeId, requestBuilder, nextCorrelationId(), clientId, createdTimeMs, expectResponse,
                 requestTimeoutMs, callback);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index e972da1..8b32e81 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -105,6 +105,35 @@ public class SaslClientAuthenticator implements Authenticator {
     private static final short DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER = -1;
     private static final Random RNG = new Random();
 
+    /**
+     * the reserved range of correlation id for Sasl requests.
+     *
+     * Noted: there is a story about reserved range. The response of LIST_OFFSET is compatible to response of SASL_HANDSHAKE.
+     * Hence, we could miss the schema error when using schema of SASL_HANDSHAKE to parse response of LIST_OFFSET.
+     * For example: the IllegalStateException caused by mismatched correlation id is thrown if following steps happens.
+     * 1) sent LIST_OFFSET
+     * 2) sent SASL_HANDSHAKE
+     * 3) receive response of LIST_OFFSET
+     * 4) succeed to use schema of SASL_HANDSHAKE to parse response of LIST_OFFSET
+     * 5) throw IllegalStateException due to mismatched correlation id
+     * As a simple approach, we force Sasl requests to use a reserved correlation id which is separated from those
+     * used in NetworkClient for Kafka requests. Hence, we can guarantee that every SASL request will throw
+     * SchemaException due to correlation id mismatch during reauthentication
+     */
+    public static final int MAX_RESERVED_CORRELATION_ID = Integer.MAX_VALUE;
+
+    /**
+     * We only expect one request in-flight a time during authentication so the small range is fine.
+     */
+    public static final int MIN_RESERVED_CORRELATION_ID = MAX_RESERVED_CORRELATION_ID - 7;
+
+    /**
+     * @return true if the correlation id is reserved for SASL request. otherwise, false
+     */
+    public static boolean isReserved(int correlationId) {
+        return correlationId >= MIN_RESERVED_CORRELATION_ID;
+    }
+
     private final Subject subject;
     private final String servicePrincipal;
     private final String host;
@@ -178,7 +207,8 @@ public class SaslClientAuthenticator implements Authenticator {
         }
     }
 
-    private SaslClient createSaslClient() {
+    // visible for testing
+    SaslClient createSaslClient() {
         try {
             return Subject.doAs(subject, (PrivilegedExceptionAction<SaslClient>) () -> {
                 String[] mechs = {mechanism};
@@ -326,6 +356,13 @@ public class SaslClientAuthenticator implements Authenticator {
         return reauthInfo.reauthenticationLatencyMs();
     }
 
+    // visible for testing
+    int nextCorrelationId() {
+        if (!isReserved(correlationId))
+            correlationId = MIN_RESERVED_CORRELATION_ID;
+        return correlationId++;
+    }
+
     private RequestHeader nextRequestHeader(ApiKeys apiKey, short version) {
         String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
         short requestApiKey = apiKey.id;
@@ -334,7 +371,7 @@ public class SaslClientAuthenticator implements Authenticator {
                 setRequestApiKey(requestApiKey).
                 setRequestApiVersion(version).
                 setClientId(clientId).
-                setCorrelationId(correlationId++),
+                setCorrelationId(nextCorrelationId()),
             apiKey.requestHeaderVersion(version));
         return currentRequestHeader;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index ccb0fee..c4bde51 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.DelayedReceive;
@@ -53,6 +54,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
 import static org.junit.Assert.assertEquals;
@@ -862,6 +866,16 @@ public class NetworkClientTest {
         assertTrue(client.canConnect(node, time.milliseconds()));
     }
 
+    @Test
+    public void testCorrelationId() {
+        int count = 100;
+        Set<Integer> ids = IntStream.range(0, count)
+            .mapToObj(i -> client.nextCorrelationId())
+            .collect(Collectors.toSet());
+        assertEquals(count, ids.size());
+        ids.forEach(id -> assertTrue(id < SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID));
+    }
+
     private RequestHeader parseHeader(ByteBuffer buffer) {
         buffer.getInt(); // skip size
         return RequestHeader.parse(buffer.slice());
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 6c7d7c8..b9c74c8 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -29,10 +29,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Base64.Encoder;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
@@ -44,10 +48,12 @@ import javax.security.auth.login.Configuration;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
@@ -75,6 +81,8 @@ import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.security.auth.Login;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -115,9 +123,11 @@ import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandl
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -1537,6 +1547,48 @@ public class SaslAuthenticatorTest {
             server.verifyReauthenticationMetrics(0, 1);
         }
     }
+
+    @Test
+    public void testCorrelationId() {
+        SaslClientAuthenticator authenticator = new SaslClientAuthenticator(
+              Collections.emptyMap(),
+              null,
+              "node",
+              null,
+              null,
+              null,
+              "plain",
+              false,
+              null,
+              null,
+            new LogContext()
+        ) {
+            @Override
+            SaslClient createSaslClient() {
+                return null;
+            }
+        };
+        int count = (SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID - SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID) * 2;
+        Set<Integer> ids = IntStream.range(0, count)
+            .mapToObj(i -> authenticator.nextCorrelationId())
+            .collect(Collectors.toSet());
+        assertEquals(SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID - SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID + 1, ids.size());
+        ids.forEach(id -> {
+            assertTrue(id >= SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID);
+            assertTrue(SaslClientAuthenticator.isReserved(id));
+        });
+    }
+
+    @Test
+    public void testConvertListOffsetResponseToSaslHandshakeResponse() {
+        ListOffsetResponse response = new ListOffsetResponse(0, Collections.singletonMap(new TopicPartition("topic", 0),
+            new ListOffsetResponse.PartitionData(Errors.NONE, 0, 0, Optional.empty())));
+        ByteBuffer buffer = response.serialize(ApiKeys.LIST_OFFSETS, LIST_OFFSETS.latestVersion(), 0);
+        final RequestHeader header0 = new RequestHeader(LIST_OFFSETS, LIST_OFFSETS.latestVersion(), "id", SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID);
+        Assert.assertThrows(SchemaException.class, () -> NetworkClient.parseResponse(buffer.duplicate(), header0));
+        final RequestHeader header1 = new RequestHeader(LIST_OFFSETS, LIST_OFFSETS.latestVersion(), "id", 1);
+        Assert.assertThrows(IllegalStateException.class, () -> NetworkClient.parseResponse(buffer.duplicate(), header1));
+    }
     
     /**
      * Re-authentication must fail if mechanism changes