You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/11 23:52:55 UTC

[kafka] branch trunk updated: KAFKA-6250: Use existing Kafka Connect internal topics without requiring ACL (#4247)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 936e81a  KAFKA-6250: Use existing Kafka Connect internal topics without requiring ACL (#4247)
936e81a is described below

commit 936e81afcb0488e65a3dbca688c768b0100fe04a
Author: Gavrie Philipson <ga...@philipson.co.il>
AuthorDate: Fri Jan 12 01:52:50 2018 +0200

    KAFKA-6250: Use existing Kafka Connect internal topics without requiring ACL (#4247)
    
    When using Kafka Connect with a cluster that doesn't allow the user to
    create topics (due to ACL configuration), Connect fails when trying to
    create its internal topics even if these topics already exist. This is
    incorrect behavior according to the documentation, which mentions that
    R/W access should be enough.
    
    This happens specifically when using Aiven Kafka, which does not permit
    creation of topics via the Kafka Admin Client API.
    
    The patch ignores the returned error, similar to the behavior for older
    brokers that don't support the API.
---
 .../java/org/apache/kafka/connect/util/TopicAdmin.java  | 14 +++++++++++---
 .../org/apache/kafka/connect/util/TopicAdminTest.java   | 17 +++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 5da4f2d..ad21561 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -229,13 +230,20 @@ public class TopicAdmin implements AutoCloseable {
                 newlyCreatedTopicNames.add(topic);
             } catch (ExecutionException e) {
                 Throwable cause = e.getCause();
-                if (e.getCause() instanceof TopicExistsException) {
+                if (cause instanceof TopicExistsException) {
                     log.debug("Found existing topic '{}' on the brokers at {}", topic, bootstrapServers);
                     continue;
                 }
                 if (cause instanceof UnsupportedVersionException) {
-                    log.debug("Unable to use Kafka admin client to create topic descriptions for '{}' using the brokers at {}," +
-                                      "falling back to assume topic(s) exist or will be auto-created by the broker", topicNameList, bootstrapServers);
+                    log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API.",
+                            " Falling back to assume topic(s) exist or will be auto-created by the broker.",
+                            topicNameList, bootstrapServers);
+                    return Collections.emptySet();
+                }
+                if (cause instanceof ClusterAuthorizationException) {
+                    log.debug("Not authorized to create topic(s) '{}'." +
+                            " Falling back to assume topic(s) exist or will be auto-created by the broker.",
+                            topicNameList, bootstrapServers);
                     return Collections.emptySet();
                 }
                 if (cause instanceof TimeoutException) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index c58d674..cda6879 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -61,6 +61,19 @@ public class TopicAdminTest {
     }
 
     @Test
+    public void returnNullWithClusterAuthorizationFailure() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean created = admin.createTopic(newTopic);
+            assertFalse(created);
+        }
+    }
+
+    @Test
     public void shouldNotCreateTopicWhenItAlreadyExists() {
         NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
@@ -120,6 +133,10 @@ public class TopicAdminTest {
         return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics);
     }
 
+    private CreateTopicsResponse createTopicResponseWithClusterAuthorizationException(NewTopic... topics) {
+        return createTopicResponse(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
+    }
+
     private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) {
         if (error == null) error = new ApiError(Errors.NONE, "");
         Map<String, ApiError> topicResults = new HashMap<>();

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].