You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/12 22:29:52 UTC
[kafka] branch trunk updated: KAFKA-5944: Unit tests for handling
SASL authentication failures in clients (#3965)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 596d3d0 KAFKA-5944: Unit tests for handling SASL authentication failures in clients (#3965)
596d3d0 is described below
commit 596d3d0ec4ee1ba719ce87e464a095bd63ab72be
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Mon Feb 12 14:29:44 2018 -0800
KAFKA-5944: Unit tests for handling SASL authentication failures in clients (#3965)
---
.../java/org/apache/kafka/clients/MockClient.java | 13 +++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 72 ++++++++++++++++++-
.../kafka/clients/consumer/KafkaConsumerTest.java | 83 ++++++++++++++++++++++
.../internals/AbstractCoordinatorTest.java | 25 +++++++
.../internals/ConsumerCoordinatorTest.java | 23 ++++++
.../internals/ConsumerNetworkClientTest.java | 19 +++++
6 files changed, 232 insertions(+), 3 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d843414..65255fe 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.Time;
@@ -77,6 +78,7 @@ public class MockClient implements KafkaClient {
private Node node = null;
private final Set<String> ready = new HashSet<>();
private final Map<Node, Long> blackedOut = new HashMap<>();
+ private final Map<Node, AuthenticationException> authenticationException = new HashMap<>();
// Use concurrent queue for requests so that requests may be queried from a different thread
private final Queue<ClientRequest> requests = new ConcurrentLinkedDeque<>();
// Use concurrent queue for responses so that responses may be updated during poll() from a different thread.
@@ -102,7 +104,7 @@ public class MockClient implements KafkaClient {
@Override
public boolean ready(Node node, long now) {
- if (isBlackedOut(node))
+ if (isBlackedOut(node) || authenticationException(node) != null)
return false;
ready.add(node.idString());
return true;
@@ -117,6 +119,12 @@ public class MockClient implements KafkaClient {
blackedOut.put(node, time.milliseconds() + duration);
}
+ public void authenticationFailed(Node node, long duration) {
+ authenticationException.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
+ disconnect(node.idString());
+ blackout(node, duration);
+ }
+
private boolean isBlackedOut(Node node) {
if (blackedOut.containsKey(node)) {
long expiration = blackedOut.get(node);
@@ -137,7 +145,7 @@ public class MockClient implements KafkaClient {
@Override
public AuthenticationException authenticationException(Node node) {
- return null;
+ return authenticationException.get(node);
}
@Override
@@ -347,6 +355,7 @@ public class MockClient implements KafkaClient {
responses.clear();
futureResponses.clear();
metadataUpdates.clear();
+ authenticationException.clear();
}
public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 186ccf0..f08a99b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -248,6 +249,75 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws Exception {
+ AdminClientUnitTestEnv env = mockClientEnv();
+ Node node = env.cluster().controller();
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().setNode(node);
+ env.kafkaClient().authenticationFailed(node, 300);
+
+ callAdminClientApisAndExpectAnAuthenticationError(env);
+
+ // wait less than the blackout period, the connection should fail and the authentication error should remain
+ env.time().sleep(30);
+ assertTrue(env.kafkaClient().connectionFailed(node));
+ callAdminClientApisAndExpectAnAuthenticationError(env);
+
+ env.close();
+ }
+
+ private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
+ env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+
+ try {
+ env.adminClient().createTopics(
+ Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+ new CreateTopicsOptions().timeoutMs(10000)).all().get();
+ fail("Expected an authentication error.");
+ } catch (ExecutionException e) {
+ assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+ }
+
+ try {
+ Map<String, NewPartitions> counts = new HashMap<>();
+ counts.put("my_topic", NewPartitions.increaseTo(3));
+ counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3))));
+ env.adminClient().createPartitions(counts).all().get();
+ fail("Expected an authentication error.");
+ } catch (ExecutionException e) {
+ assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+ }
+
+ try {
+ env.adminClient().createAcls(asList(ACL1, ACL2)).all().get();
+ fail("Expected an authentication error.");
+ } catch (ExecutionException e) {
+ assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+ }
+
+ try {
+ env.adminClient().describeAcls(FILTER1).values().get();
+ fail("Expected an authentication error.");
+ } catch (ExecutionException e) {
+ assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+ }
+
+ try {
+ env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get();
+ fail("Expected an authentication error.");
+ } catch (ExecutionException e) {
+ assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+ }
+
+ try {
+ env.adminClient().describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get();
+ fail("Expected an authentication error.");
+ } catch (ExecutionException e) {
+ assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+ }
+ }
+
private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"),
@@ -579,7 +649,7 @@ public class KafkaAdminClientTest {
private int numTries = 0;
private int failuresInjected = 0;
-
+
@Override
public KafkaAdminClient.TimeoutProcessor create(long now) {
return new FailureInjectingTimeoutProcessor(now);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index d47124f..be8db2b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
@@ -1431,6 +1432,88 @@ public class KafkaConsumerTest {
}
}
+ @Test
+ public void testConsumerWithinBlackoutPeriodAfterAuthenticationFailure() {
+ int rebalanceTimeoutMs = 60000;
+ int sessionTimeoutMs = 30000;
+ int heartbeatIntervalMs = 3000;
+ int autoCommitIntervalMs = 1000;
+
+ Time time = new MockTime();
+ Map<String, Integer> tpCounts = new HashMap<>();
+ tpCounts.put(topic, 1);
+ Cluster cluster = TestUtils.singletonCluster(tpCounts);
+ Node node = cluster.nodes().get(0);
+
+ Metadata metadata = createMetadata();
+ metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
+ client.authenticationFailed(node, 300);
+ PartitionAssignor assignor = new RangeAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+
+ consumer.subscribe(Collections.singleton(topic));
+ callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
+
+ time.sleep(30); // wait less than the blackout period
+ assertTrue(client.connectionFailed(node));
+ callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
+
+ client.requests().clear();
+ consumer.close(0, TimeUnit.MILLISECONDS);
+ }
+
+ private void callConsumerApisAndExpectAnAuthenticationError(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+ try {
+ consumer.partitionsFor("some other topic");
+ fail("Expected an authentication error!");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+
+ try {
+ consumer.beginningOffsets(Collections.singleton(partition));
+ fail("Expected an authentication error!");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+
+ try {
+ consumer.endOffsets(Collections.singleton(partition));
+ fail("Expected an authentication error!");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+
+ try {
+ consumer.poll(10);
+ fail("Expected an authentication error!");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+
+ Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
+ offset.put(partition, new OffsetAndMetadata(10L));
+
+ try {
+ consumer.commitSync(offset);
+ fail("Expected an authentication error!");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+
+ try {
+ consumer.committed(partition);
+ fail("Expected an authentication error!");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+ }
+
private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) {
return new ConsumerRebalanceListener() {
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 7eaca98..1c88803 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -519,6 +520,30 @@ public class AbstractCoordinatorTest {
awaitFirstHeartbeat(heartbeatReceived);
}
+ @Test
+ public void testEnsureCoordinatorReadyWithinBlackoutPeriodAfterAuthenticationFailure() {
+ setupCoordinator(RETRY_BACKOFF_MS);
+
+ mockClient.authenticationFailed(node, 300);
+
+ try {
+ coordinator.ensureCoordinatorReady();
+ fail("Expected an authentication error.");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+
+ mockTime.sleep(30); // wait less than the blackout period
+ assertTrue(mockClient.connectionFailed(node));
+
+ try {
+ coordinator.ensureCoordinatorReady();
+ fail("Expected an authentication error.");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+ }
+
private AtomicBoolean prepareFirstHeartbeat() {
final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
mockClient.prepareResponse(new MockClient.RequestMatcher() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c49339b..fdaa6b3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
@@ -1504,6 +1505,28 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure() {
+ client.authenticationFailed(node, 300);
+
+ try {
+ coordinator.ensureActiveGroup();
+ fail("Expected an authentication error.");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+
+ time.sleep(30); // wait less than the blackout period
+ assertTrue(client.connectionFailed(node));
+
+ try {
+ coordinator.ensureActiveGroup();
+ fail("Expected an authentication error.");
+ } catch (AuthenticationException e) {
+ // OK
+ }
+ }
+
+ @Test
public void testProtocolMetadataOrder() {
RoundRobinAssignor roundRobin = new RoundRobinAssignor();
RangeAssignor range = new RangeAssignor();
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 93c6acd..904270e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.Errors;
@@ -70,6 +71,24 @@ public class ConsumerNetworkClientTest {
}
@Test
+ public void sendWithinBlackoutPeriodAfterAuthenticationFailure() throws InterruptedException {
+ client.authenticationFailed(node, 300);
+ client.prepareResponse(heartbeatResponse(Errors.NONE));
+ final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
+ consumerClient.poll(future);
+ assertTrue(future.failed());
+ assertTrue("Expected only an authentication error.", future.exception() instanceof AuthenticationException);
+
+ time.sleep(30); // wait less than the blackout period
+ assertTrue(client.connectionFailed(node));
+
+ final RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat());
+ consumerClient.poll(future2);
+ assertTrue(future2.failed());
+ assertTrue("Expected only an authentication error.", future2.exception() instanceof AuthenticationException);
+ }
+
+ @Test
public void multiSend() {
client.prepareResponse(heartbeatResponse(Errors.NONE));
client.prepareResponse(heartbeatResponse(Errors.NONE));
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.