You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/11 18:23:54 UTC
[kafka] branch 2.1 updated: KAFKA-7633: Allow Kafka Connect to
access internal topics without cluster ACLs (#5918)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new c4394bd KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (#5918)
c4394bd is described below
commit c4394bd6663a915a2f7efe045bf93645ee6574cf
Author: Arabelle Hou <ar...@gmail.com>
AuthorDate: Sat May 11 11:05:37 2019 -0700
KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (#5918)
When Kafka Connect does not have cluster ACLs to create topics,
it fails to even access its internal topics which already exist.
This was originally fixed in KAFKA-6250 by ignoring the cluster
authorization error, but now Kafka 2.0 returns a different response
code that corresponds to a different error. Add a patch to ignore this
new error as well.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../org/apache/kafka/connect/util/TopicAdmin.java | 7 +++++++
.../org/apache/kafka/connect/util/TopicAdminTest.java | 19 ++++++++++++++++++-
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index ad21561..72a5981 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -246,6 +247,12 @@ public class TopicAdmin implements AutoCloseable {
topicNameList, bootstrapServers);
return Collections.emptySet();
}
+ if (cause instanceof TopicAuthorizationException) {
+ log.debug("Not authorized to create topic(s) '{}'." +
+ " Falling back to assume topic(s) exist or will be auto-created by the broker.",
+ topicNameList, bootstrapServers);
+ return Collections.emptySet();
+ }
if (cause instanceof TimeoutException) {
// Timed out waiting for the operation to complete
throw new ConnectException("Timed out while checking for or creating topic(s) '" + topicNameList + "'." +
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 5b1e155..7b6f47c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -17,8 +17,8 @@
package org.apache.kafka.connect.util;
import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
+import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@@ -73,6 +73,19 @@ public class TopicAdminTest {
}
@Test
+ public void returnNullWithTopicAuthorizationFailure() {
+ final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+ Cluster cluster = createCluster(1);
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+ env.kafkaClient().setNode(cluster.nodes().iterator().next());
+ env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic));
+ TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+ boolean created = admin.createTopic(newTopic);
+ assertFalse(created);
+ }
+ }
+
+ @Test
public void shouldNotCreateTopicWhenItAlreadyExists() {
NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
@@ -136,6 +149,10 @@ public class TopicAdminTest {
return createTopicResponse(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
}
+ private CreateTopicsResponse createTopicResponseWithTopicAuthorizationException(NewTopic... topics) {
+ return createTopicResponse(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
+ }
+
private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) {
if (error == null) error = new ApiError(Errors.NONE, "");
Map<String, ApiError> topicResults = new HashMap<>();