You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/11 18:23:54 UTC

[kafka] branch 2.1 updated: KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (#5918)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new c4394bd  KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (#5918)
c4394bd is described below

commit c4394bd6663a915a2f7efe045bf93645ee6574cf
Author: Arabelle Hou <ar...@gmail.com>
AuthorDate: Sat May 11 11:05:37 2019 -0700

    KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (#5918)
    
    When Kafka Connect does not have cluster ACLs to create topics,
    it fails to even access its internal topics which already exist.
    This was originally fixed in KAFKA-6250 by ignoring the cluster
    authorization error, but now Kafka 2.0 returns a different response
    code that corresponds to a different error. Add a patch to ignore this
    new error as well.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../org/apache/kafka/connect/util/TopicAdmin.java     |  7 +++++++
 .../org/apache/kafka/connect/util/TopicAdminTest.java | 19 ++++++++++++++++++-
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index ad21561..72a5981 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -246,6 +247,12 @@ public class TopicAdmin implements AutoCloseable {
                             topicNameList, bootstrapServers);
                     return Collections.emptySet();
                 }
+                if (cause instanceof TopicAuthorizationException) {
+                    log.debug("Not authorized to create topic(s) '{}'." +
+                                    " Falling back to assume topic(s) exist or will be auto-created by the broker.",
+                            topicNameList, bootstrapServers);
+                    return Collections.emptySet();
+                }
                 if (cause instanceof TimeoutException) {
                     // Timed out waiting for the operation to complete
                     throw new ConnectException("Timed out while checking for or creating topic(s) '" + topicNameList + "'." +
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 5b1e155..7b6f47c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
+import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -73,6 +73,19 @@ public class TopicAdminTest {
     }
 
     @Test
+    public void returnNullWithTopicAuthorizationFailure() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNode(cluster.nodes().iterator().next());
+            env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean created = admin.createTopic(newTopic);
+            assertFalse(created);
+        }
+    }
+
+    @Test
     public void shouldNotCreateTopicWhenItAlreadyExists() {
         NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
@@ -136,6 +149,10 @@ public class TopicAdminTest {
         return createTopicResponse(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
     }
 
+    private CreateTopicsResponse createTopicResponseWithTopicAuthorizationException(NewTopic... topics) {
+        return createTopicResponse(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
+    }
+
     private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) {
         if (error == null) error = new ApiError(Errors.NONE, "");
         Map<String, ApiError> topicResults = new HashMap<>();