You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/04/20 11:03:21 UTC
[kafka] branch 2.4 updated: KAFKA-9854 Re-authenticating causes
mismatched parse of response (#8471)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new c94e4e7 KAFKA-9854 Re-authenticating causes mismatched parse of response (#8471)
c94e4e7 is described below
commit c94e4e722f2de5ef7755e715cb371db404718e27
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu Apr 16 20:26:30 2020 +0800
KAFKA-9854 Re-authenticating causes mismatched parse of response (#8471)
Reviewers: Rajini Sivaram <ra...@googlemail.com>, Ron Dagostino <rd...@confluent.io>
---
.../org/apache/kafka/clients/NetworkClient.java | 21 ++++++++-
.../authenticator/SaslClientAuthenticator.java | 41 ++++++++++++++++-
.../apache/kafka/clients/NetworkClientTest.java | 14 ++++++
.../authenticator/SaslAuthenticatorTest.java | 51 ++++++++++++++++++++++
4 files changed, 123 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3431b83..9e52b4e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
@@ -41,6 +42,7 @@ import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -932,9 +934,15 @@ public class NetworkClient implements KafkaClient {
* Validate that the response corresponds to the request we expect or else explode
*/
private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
- if (requestHeader.correlationId() != responseHeader.correlationId())
+ if (requestHeader.correlationId() != responseHeader.correlationId()) {
+ if (SaslClientAuthenticator.isReserved(requestHeader.correlationId())
+ && !SaslClientAuthenticator.isReserved(responseHeader.correlationId()))
+ throw new SchemaException("the response is unrelated to Sasl request since its correlation id is " + responseHeader.correlationId()
+ + " and the reserved range for Sasl request is [ "
+ + SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID + "," + SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID + "]");
throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
+ ") does not match request (" + requestHeader.correlationId() + "), request header: " + requestHeader);
+ }
}
/**
@@ -1139,6 +1147,15 @@ public class NetworkClient implements KafkaClient {
return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, defaultRequestTimeoutMs, null);
}
+ // visible for testing
+ int nextCorrelationId() {
+ if (SaslClientAuthenticator.isReserved(correlation)) {
+ // the numeric overflow is fine as negative values is acceptable
+ correlation = SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID + 1;
+ }
+ return correlation++;
+ }
+
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder,
@@ -1146,7 +1163,7 @@ public class NetworkClient implements KafkaClient {
boolean expectResponse,
int requestTimeoutMs,
RequestCompletionHandler callback) {
- return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, createdTimeMs, expectResponse,
+ return new ClientRequest(nodeId, requestBuilder, nextCorrelationId(), clientId, createdTimeMs, expectResponse,
requestTimeoutMs, callback);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 6aa68cd..c75bb82 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -106,6 +106,35 @@ public class SaslClientAuthenticator implements Authenticator {
private static final short DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER = -1;
private static final Random RNG = new Random();
+ /**
+ * the reserved range of correlation id for Sasl requests.
+ *
+ * Noted: there is a story about reserved range. The response of LIST_OFFSET is compatible to response of SASL_HANDSHAKE.
+ * Hence, we could miss the schema error when using schema of SASL_HANDSHAKE to parse response of LIST_OFFSET.
+ * For example: the IllegalStateException caused by mismatched correlation id is thrown if following steps happens.
+ * 1) sent LIST_OFFSET
+ * 2) sent SASL_HANDSHAKE
+ * 3) receive response of LIST_OFFSET
+ * 4) succeed to use schema of SASL_HANDSHAKE to parse response of LIST_OFFSET
+ * 5) throw IllegalStateException due to mismatched correlation id
+ * As a simple approach, we force Sasl requests to use a reserved correlation id which is separated from those
+ * used in NetworkClient for Kafka requests. Hence, we can guarantee that every SASL request will throw
+ * SchemaException due to correlation id mismatch during reauthentication
+ */
+ public static final int MAX_RESERVED_CORRELATION_ID = Integer.MAX_VALUE;
+
+ /**
+ * We only expect one request in-flight a time during authentication so the small range is fine.
+ */
+ public static final int MIN_RESERVED_CORRELATION_ID = MAX_RESERVED_CORRELATION_ID - 7;
+
+ /**
+ * @return true if the correlation id is reserved for SASL request. otherwise, false
+ */
+ public static boolean isReserved(int correlationId) {
+ return correlationId >= MIN_RESERVED_CORRELATION_ID;
+ }
+
private final Subject subject;
private final String servicePrincipal;
private final String host;
@@ -174,7 +203,8 @@ public class SaslClientAuthenticator implements Authenticator {
}
}
- private SaslClient createSaslClient() {
+ // visible for testing
+ SaslClient createSaslClient() {
try {
return Subject.doAs(subject, (PrivilegedExceptionAction<SaslClient>) () -> {
String[] mechs = {mechanism};
@@ -323,6 +353,13 @@ public class SaslClientAuthenticator implements Authenticator {
return reauthInfo.reauthenticationLatencyMs();
}
+ // visible for testing
+ int nextCorrelationId() {
+ if (!isReserved(correlationId))
+ correlationId = MIN_RESERVED_CORRELATION_ID;
+ return correlationId++;
+ }
+
private RequestHeader nextRequestHeader(ApiKeys apiKey, short version) {
String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
short requestApiKey = apiKey.id;
@@ -331,7 +368,7 @@ public class SaslClientAuthenticator implements Authenticator {
setRequestApiKey(requestApiKey).
setRequestApiVersion(version).
setClientId(clientId).
- setCorrelationId(correlationId++),
+ setCorrelationId(nextCorrelationId()),
apiKey.requestHeaderVersion(version));
return currentRequestHeader;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index a4145d1..e445c55 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.DelayedReceive;
@@ -53,6 +54,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
import static org.junit.Assert.assertEquals;
@@ -862,6 +866,16 @@ public class NetworkClientTest {
assertTrue(client.canConnect(node, time.milliseconds()));
}
+ @Test
+ public void testCorrelationId() {
+ int count = 100;
+ Set<Integer> ids = IntStream.range(0, count)
+ .mapToObj(i -> client.nextCorrelationId())
+ .collect(Collectors.toSet());
+ assertEquals(count, ids.size());
+ ids.forEach(id -> assertTrue(id < SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID));
+ }
+
private RequestHeader parseHeader(ByteBuffer buffer) {
buffer.getInt(); // skip size
return RequestHeader.parse(buffer.slice());
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 6d95d2e..dd3c61d 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -29,10 +29,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.Base64.Encoder;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
@@ -44,10 +48,12 @@ import javax.security.auth.login.Configuration;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
@@ -74,6 +80,8 @@ import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -113,9 +121,11 @@ import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandl
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -1506,6 +1516,47 @@ public class SaslAuthenticatorTest {
server.verifyReauthenticationMetrics(0, 1);
}
}
+
+ @Test
+ public void testCorrelationId() {
+ SaslClientAuthenticator authenticator = new SaslClientAuthenticator(
+ Collections.emptyMap(),
+ null,
+ "node",
+ null,
+ null,
+ null,
+ "plain",
+ false,
+ null,
+ null
+ ) {
+ @Override
+ SaslClient createSaslClient() {
+ return null;
+ }
+ };
+ int count = (SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID - SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID) * 2;
+ Set<Integer> ids = IntStream.range(0, count)
+ .mapToObj(i -> authenticator.nextCorrelationId())
+ .collect(Collectors.toSet());
+ assertEquals(SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID - SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID + 1, ids.size());
+ ids.forEach(id -> {
+ assertTrue(id >= SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID);
+ assertTrue(SaslClientAuthenticator.isReserved(id));
+ });
+ }
+
+ @Test
+ public void testConvertListOffsetResponseToSaslHandshakeResponse() {
+ ListOffsetResponse response = new ListOffsetResponse(0, Collections.singletonMap(new TopicPartition("topic", 0),
+ new ListOffsetResponse.PartitionData(Errors.NONE, 0, 0, Optional.empty())));
+ ByteBuffer buffer = response.serialize(ApiKeys.LIST_OFFSETS, LIST_OFFSETS.latestVersion(), 0);
+ final RequestHeader header0 = new RequestHeader(LIST_OFFSETS, LIST_OFFSETS.latestVersion(), "id", SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID);
+ Assert.assertThrows(SchemaException.class, () -> NetworkClient.parseResponse(buffer.duplicate(), header0));
+ final RequestHeader header1 = new RequestHeader(LIST_OFFSETS, LIST_OFFSETS.latestVersion(), "id", 1);
+ Assert.assertThrows(IllegalStateException.class, () -> NetworkClient.parseResponse(buffer.duplicate(), header1));
+ }
/**
* Re-authentication must fail if mechanism changes