You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/12 22:29:52 UTC

[kafka] branch trunk updated: KAFKA-5944: Unit tests for handling SASL authentication failures in clients (#3965)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 596d3d0  KAFKA-5944: Unit tests for handling SASL authentication failures in clients (#3965)
596d3d0 is described below

commit 596d3d0ec4ee1ba719ce87e464a095bd63ab72be
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Mon Feb 12 14:29:44 2018 -0800

    KAFKA-5944: Unit tests for handling SASL authentication failures in clients (#3965)
---
 .../java/org/apache/kafka/clients/MockClient.java  | 13 +++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 72 ++++++++++++++++++-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 83 ++++++++++++++++++++++
 .../internals/AbstractCoordinatorTest.java         | 25 +++++++
 .../internals/ConsumerCoordinatorTest.java         | 23 ++++++
 .../internals/ConsumerNetworkClientTest.java       | 19 +++++
 6 files changed, 232 insertions(+), 3 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d843414..65255fe 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.Time;
@@ -77,6 +78,7 @@ public class MockClient implements KafkaClient {
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
     private final Map<Node, Long> blackedOut = new HashMap<>();
+    private final Map<Node, AuthenticationException> authenticationException = new HashMap<>();
     // Use concurrent queue for requests so that requests may be queried from a different thread
     private final Queue<ClientRequest> requests = new ConcurrentLinkedDeque<>();
     // Use concurrent queue for responses so that responses may be updated during poll() from a different thread.
@@ -102,7 +104,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public boolean ready(Node node, long now) {
-        if (isBlackedOut(node))
+        if (isBlackedOut(node) || authenticationException(node) != null)
             return false;
         ready.add(node.idString());
         return true;
@@ -117,6 +119,12 @@ public class MockClient implements KafkaClient {
         blackedOut.put(node, time.milliseconds() + duration);
     }
 
+    public void authenticationFailed(Node node, long duration) {
+        authenticationException.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
+        disconnect(node.idString());
+        blackout(node, duration);
+    }
+
     private boolean isBlackedOut(Node node) {
         if (blackedOut.containsKey(node)) {
             long expiration = blackedOut.get(node);
@@ -137,7 +145,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public AuthenticationException authenticationException(Node node) {
-        return null;
+        return authenticationException.get(node);
     }
 
     @Override
@@ -347,6 +355,7 @@ public class MockClient implements KafkaClient {
         responses.clear();
         futureResponses.clear();
         metadataUpdates.clear();
+        authenticationException.clear();
     }
 
     public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 186ccf0..f08a99b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -248,6 +249,75 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws Exception {
+        AdminClientUnitTestEnv env = mockClientEnv();
+        Node node = env.cluster().controller();
+        env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+        env.kafkaClient().setNode(node);
+        env.kafkaClient().authenticationFailed(node, 300);
+
+        callAdminClientApisAndExpectAnAuthenticationError(env);
+
+        // wait less than the blackout period, the connection should fail and the authentication error should remain
+        env.time().sleep(30);
+        assertTrue(env.kafkaClient().connectionFailed(node));
+        callAdminClientApisAndExpectAnAuthenticationError(env);
+
+        env.close();
+    }
+
+    private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
+        env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+
+        try {
+            env.adminClient().createTopics(
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
+                    new CreateTopicsOptions().timeoutMs(10000)).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+        }
+
+        try {
+            Map<String, NewPartitions> counts = new HashMap<>();
+            counts.put("my_topic", NewPartitions.increaseTo(3));
+            counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3))));
+            env.adminClient().createPartitions(counts).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+        }
+
+        try {
+            env.adminClient().createAcls(asList(ACL1, ACL2)).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+        }
+
+        try {
+            env.adminClient().describeAcls(FILTER1).values().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+        }
+
+        try {
+            env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+        }
+
+        try {
+            env.adminClient().describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get();
+            fail("Expected an authentication error.");
+        } catch (ExecutionException e) {
+            assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
+        }
+    }
+
     private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
     private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"),
@@ -579,7 +649,7 @@ public class KafkaAdminClientTest {
         private int numTries = 0;
 
         private int failuresInjected = 0;
-        
+
         @Override
         public KafkaAdminClient.TimeoutProcessor create(long now) {
             return new FailureInjectingTimeoutProcessor(now);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index d47124f..be8db2b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
@@ -1431,6 +1432,88 @@ public class KafkaConsumerTest {
         }
     }
 
+    @Test
+    public void testConsumerWithinBlackoutPeriodAfterAuthenticationFailure() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        Map<String, Integer> tpCounts = new HashMap<>();
+        tpCounts.put(topic, 1);
+        Cluster cluster = TestUtils.singletonCluster(tpCounts);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = createMetadata();
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        client.authenticationFailed(node, 300);
+        PartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+
+        consumer.subscribe(Collections.singleton(topic));
+        callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
+
+        time.sleep(30); // wait less than the blackout period
+        assertTrue(client.connectionFailed(node));
+        callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
+
+        client.requests().clear();
+        consumer.close(0, TimeUnit.MILLISECONDS);
+    }
+
+    private void callConsumerApisAndExpectAnAuthenticationError(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
+        try {
+            consumer.partitionsFor("some other topic");
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        try {
+            consumer.beginningOffsets(Collections.singleton(partition));
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        try {
+            consumer.endOffsets(Collections.singleton(partition));
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        try {
+            consumer.poll(10);
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
+        offset.put(partition, new OffsetAndMetadata(10L));
+
+        try {
+            consumer.commitSync(offset);
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        try {
+            consumer.committed(partition);
+            fail("Expected an authentication error!");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+    }
+
     private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) {
         return new ConsumerRebalanceListener() {
             @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 7eaca98..1c88803 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -519,6 +520,30 @@ public class AbstractCoordinatorTest {
         awaitFirstHeartbeat(heartbeatReceived);
     }
 
+    @Test
+    public void testEnsureCoordinatorReadyWithinBlackoutPeriodAfterAuthenticationFailure() {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
+        mockClient.authenticationFailed(node, 300);
+
+        try {
+            coordinator.ensureCoordinatorReady();
+            fail("Expected an authentication error.");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        mockTime.sleep(30); // wait less than the blackout period
+        assertTrue(mockClient.connectionFailed(node));
+
+        try {
+            coordinator.ensureCoordinatorReady();
+            fail("Expected an authentication error.");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+    }
+
     private AtomicBoolean prepareFirstHeartbeat() {
         final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c49339b..fdaa6b3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
@@ -1504,6 +1505,28 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure() {
+        client.authenticationFailed(node, 300);
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Expected an authentication error.");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+
+        time.sleep(30); // wait less than the blackout period
+        assertTrue(client.connectionFailed(node));
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Expected an authentication error.");
+        } catch (AuthenticationException e) {
+            // OK
+        }
+    }
+
+    @Test
     public void testProtocolMetadataOrder() {
         RoundRobinAssignor roundRobin = new RoundRobinAssignor();
         RangeAssignor range = new RangeAssignor();
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 93c6acd..904270e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
@@ -70,6 +71,24 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
+    public void sendWithinBlackoutPeriodAfterAuthenticationFailure() throws InterruptedException {
+        client.authenticationFailed(node, 300);
+        client.prepareResponse(heartbeatResponse(Errors.NONE));
+        final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
+        consumerClient.poll(future);
+        assertTrue(future.failed());
+        assertTrue("Expected only an authentication error.", future.exception() instanceof AuthenticationException);
+
+        time.sleep(30); // wait less than the blackout period
+        assertTrue(client.connectionFailed(node));
+
+        final RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat());
+        consumerClient.poll(future2);
+        assertTrue(future2.failed());
+        assertTrue("Expected only an authentication error.", future2.exception() instanceof AuthenticationException);
+    }
+
+    @Test
     public void multiSend() {
         client.prepareResponse(heartbeatResponse(Errors.NONE));
         client.prepareResponse(heartbeatResponse(Errors.NONE));

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.