You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/11 18:06:05 UTC

[kafka] branch trunk updated: KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (#5918)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4c85171  KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (#5918)
4c85171 is described below

commit 4c85171a1f1219bb735be3ca034640e298fda0dc
Author: Arabelle Hou <ar...@gmail.com>
AuthorDate: Sat May 11 11:05:37 2019 -0700

    KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (#5918)
    
    When Kafka Connect does not have cluster ACLs to create topics,
    it fails to even access its internal topics which already exist.
    This was originally fixed in KAFKA-6250 by ignoring the cluster
    authorization error, but now Kafka 2.0 returns a different response
    code that corresponds to a different error. Add a patch to ignore this
    new error as well.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/connect/util/TopicAdmin.java   |  7 +++++++
 .../org/apache/kafka/connect/util/TopicAdminTest.java    | 16 ++++++++++++++++
 2 files changed, 23 insertions(+)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index ad21561..72a5981 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -246,6 +247,12 @@ public class TopicAdmin implements AutoCloseable {
                             topicNameList, bootstrapServers);
                     return Collections.emptySet();
                 }
+                if (cause instanceof TopicAuthorizationException) {
+                    log.debug("Not authorized to create topic(s) '{}'." +
+                                    " Falling back to assume topic(s) exist or will be auto-created by the broker.",
+                            topicNameList, bootstrapServers);
+                    return Collections.emptySet();
+                }
                 if (cause instanceof TimeoutException) {
                     // Timed out waiting for the operation to complete
                     throw new ConnectException("Timed out while checking for or creating topic(s) '" + topicNameList + "'." +
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 64ddfeb..cfcab32 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -73,6 +73,18 @@ public class TopicAdminTest {
     }
 
     @Test
+    public void returnNullWithTopicAuthorizationFailure() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean created = admin.createTopic(newTopic);
+            assertFalse(created);
+        }
+    }
+
+    @Test
     public void shouldNotCreateTopicWhenItAlreadyExists() {
         NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
@@ -136,6 +148,10 @@ public class TopicAdminTest {
         return createTopicResponse(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
     }
 
+    private CreateTopicsResponse createTopicResponseWithTopicAuthorizationException(NewTopic... topics) {
+        return createTopicResponse(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
+    }
+
     private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) {
         if (error == null) error = new ApiError(Errors.NONE, "");
         CreateTopicsResponseData response = new CreateTopicsResponseData();