You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "hudeqi (via GitHub)" <gi...@apache.org> on 2023/06/25 12:32:18 UTC

[GitHub] [kafka] hudeqi opened a new pull request, #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

hudeqi opened a new pull request, #13913:
URL: https://github.com/apache/kafka/pull/13913

   ### Activation
   n the “syncTopicAcls” thread of MirrorSourceConnector, full amount of "TopicAclBindings" related to the replicated topics of the source cluster will be regularly listed, and then fully updated to the target cluster. Therefore, a large number of repeated "TopicAclBindings" will be repeatedly sent  by calling "targetAdminClient". This action is redundant. In addition, if too many "TopicAclBindings" are updated at one time, it may also take a long time for the target cluster to handle processing the "createAcls" request, which will affect the accumulation of the request queue of the target cluster and further affect the processing delay of other type requests.
   
   ### Solution
   "TopicAclBinding" can be like the variable “knownConsumerGroups” in MirrorCheckpointConnector, and only update the incremental added "TopicAclBinding" every time, which can solve the above-mentioned problems.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1637307673

   > @hudeqi Sorry, I think there's a misunderstanding here. I'm not claiming that MM2 would be incapable of detecting changes in source cluster ACLs with this change; I'm worried that it would be unable to detect (or really, just overwrite) changes in target cluster ACLs, if they were made by a separate user/process from the MM2 cluster.
   
   Yes, the logic has changed before and after the change (reconfirm that there is no misunderstanding: the ACL of the target cluster is updated but the ACL of the source cluster is not updated. In this case, the ACL of the source cluster cannot be used to cover the target cluster). But look at this [document](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0), it seems that the goal of this ACL synchronization is only to synchronize the changes of the source cluster ACL? I don't know if I understand it right.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1630082775

   I have added the unit test for the related ”createAcl failure“ case, thanks for the review! @C0urante 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1606067371

   @C0urante @mimaison Hi, please help to review this PR if you have time, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1618169057

   Hi, I have updated, which show in unresolved conversation. @C0urante 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1259876757


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -581,13 +582,26 @@ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
         }));
     }
 
-    private void updateTopicAcls(List<AclBinding> bindings) {
-        log.trace("Syncing {} topic ACL bindings.", bindings.size());
-        targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
-            if (e != null) {
-                log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
-            }
-        }));
+    // Visible for testing
+    int updateTopicAcls(List<AclBinding> bindings) {
+        Set<AclBinding> addBindings = new HashSet<>(bindings);
+        Set<AclBinding> failedBindings = new HashSet<>();
+        addBindings.removeAll(knownTopicAclBindings);
+        int newBindCount = addBindings.size();
+        if (!addBindings.isEmpty()) {
+            log.info("Syncing new found {} topic ACL bindings.", newBindCount);
+            targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+                if (e != null) {
+                    log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
+                    failedBindings.add(k);
+                }
+            }));
+            knownTopicAclBindings = new HashSet<>(bindings);
+            knownTopicAclBindings.removeAll(failedBindings);

Review Comment:
   Isn't `failedBindings` likely to be empty at this point unless the admin client is able to perform the request to create the ACL bindings exceptionally fast (which also happens to be the scenario we're covering in the unit test)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1625189960

   pin again @C0urante 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1606068264

   If this improvement is reasonable, I will add related unit test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1248004857


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
     }
 
     private void updateTopicAcls(List<AclBinding> bindings) {
-        log.trace("Syncing {} topic ACL bindings.", bindings.size());
-        targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
-            if (e != null) {
-                log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
-            }
-        }));
+        Set<AclBinding> addBindings = new HashSet<>();
+        addBindings.addAll(bindings);

Review Comment:
   Nit: can simplify
   ```suggestion
           Set<AclBinding> addBindings = new HashSet<>(bindings);
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
     }
 
     private void updateTopicAcls(List<AclBinding> bindings) {
-        log.trace("Syncing {} topic ACL bindings.", bindings.size());
-        targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
-            if (e != null) {
-                log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
-            }
-        }));
+        Set<AclBinding> addBindings = new HashSet<>();
+        addBindings.addAll(bindings);
+        addBindings.removeAll(knownTopicAclBindings);
+        if (!addBindings.isEmpty()) {
+            log.info("Syncing new found {} topic ACL bindings.", addBindings.size());
+            targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+                if (e != null) {
+                    log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
+                }
+            }));
+            knownTopicAclBindings = bindings;

Review Comment:
   ```suggestion
               knownTopicAclBindings =  new HashSet<>(bindings);
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
     }
 
     private void updateTopicAcls(List<AclBinding> bindings) {
-        log.trace("Syncing {} topic ACL bindings.", bindings.size());
-        targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
-            if (e != null) {
-                log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
-            }
-        }));
+        Set<AclBinding> addBindings = new HashSet<>();
+        addBindings.addAll(bindings);
+        addBindings.removeAll(knownTopicAclBindings);

Review Comment:
   My IDE nagged me about possible slow performance for invoking `Set::removeAll` with a `List` as an argument. Some research led to [this fascinating blog post](https://codeblog.jonskeet.uk/2010/07/29/there-s-a-hole-in-my-abstraction-dear-liza-dear-liza/).
   
   I think the scenario described there isn't likely to impact us frequently, but just in case, do you think we can change `knownTopicAclBindings` from a `List` to a `Set`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1630624613

   I would like to ask a question by the way: Why do we not synchronize the write permission of `TopicAclBing` and `GroupAclBinding` in MirrorSourceConnector? @C0urante @gharris1727 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1613529431

   Hi! please help to review this PR when you are free, thank you! @C0urante 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1259093047


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
         }));
     }
 
-    private void updateTopicAcls(List<AclBinding> bindings) {
-        log.trace("Syncing {} topic ACL bindings.", bindings.size());
-        targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
-            if (e != null) {
-                log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
-            }
-        }));
+    // Visible for testing
+    int updateTopicAcls(List<AclBinding> bindings) {
+        Set<AclBinding> addBindings = new HashSet<>(bindings);
+        addBindings.removeAll(knownTopicAclBindings);
+        int newBindCount = addBindings.size();
+        if (!addBindings.isEmpty()) {
+            log.info("Syncing new found {} topic ACL bindings.", newBindCount);
+            targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+                if (e != null) {
+                    log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
+                }
+            }));
+            knownTopicAclBindings = new HashSet<>(bindings);

Review Comment:
   nice cacth !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector [kafka]

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi closed pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector
URL: https://github.com/apache/kafka/pull/13913


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1259093047


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
         }));
     }
 
-    private void updateTopicAcls(List<AclBinding> bindings) {
-        log.trace("Syncing {} topic ACL bindings.", bindings.size());
-        targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
-            if (e != null) {
-                log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
-            }
-        }));
+    // Visible for testing
+    int updateTopicAcls(List<AclBinding> bindings) {
+        Set<AclBinding> addBindings = new HashSet<>(bindings);
+        addBindings.removeAll(knownTopicAclBindings);
+        int newBindCount = addBindings.size();
+        if (!addBindings.isEmpty()) {
+            log.info("Syncing new found {} topic ACL bindings.", newBindCount);
+            targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+                if (e != null) {
+                    log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
+                }
+            }));
+            knownTopicAclBindings = new HashSet<>(bindings);

Review Comment:
   nice catch !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1635200263

   Especially thanks for your separate reply! @C0urante I have tracked it in the [Jira ticket](https://issues.apache.org/jira/browse/KAFKA-15172).
   
   Going back to this PR, although it is named incremental ACL synchronization, it does not change the existing behavior. The reason is: `knownTopicAclBindings` saves all the ACL that `MirrorSourceConnector` has sensed for the synchronization topics of the source cluster and has been synchronized to the target cluster. When the initial startup, all relevant ACL will be synchronized, and then as long as the source cluster ACL changes, it will be synchronized to the target cluster again as a new ACL (so it caches not the view of the target cluster's ACLs, but the view of the source cluster's ACLs), as for you mentioned "The current behavior is responsible not only for creating initial ACL bindings, but also for continuously re-applying them if they are changed." In fact, the logic has not changed, because the `AclBinding` class implements `equals` layer by layer, the resynchronization of the changed `AclBinding` should not be missed.
   
   In addition, the "affect the accumulation of the request queue of the target cluster and further affect the processing delay of other type requests" I mentioned is not groundless. It is a serious problem found in production environment. It can be seen that full synchronization and incremental synchronization are important for target cluster producer latency impact:
   ![image](https://github.com/apache/kafka/assets/16536770/9713ae3b-5793-452d-b88d-13b344488f24)
   ![image](https://github.com/apache/kafka/assets/16536770/5d56ea52-43e9-4a8b-ae3b-510ff66a2b05)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1639053363

   The original MM2 KIPs use very few words to describe very large parts of it's functionality, often leaving things very under-specified, which I think is the case here. I don't think that the original proposal gives us enough to decide for or against this change.
   
   Personally, I think that if a user can get themselves into a situation where they:
   1. Have ACL sync enabled
   2. Can externally observe some difference between the source and target
   3. The difference has existed for (2x) longer than the sync interval
   
   They are reasonable to conclude that the system is misbehaving, either because the source or target system is unhealthy, or MM2 is unhealthy, or MM2 has a bug in it. If caching causes the above situation to occur, I don't think that caching is a viable solution.
   
   I'd be interested in trying other strategies such as:
   1. Intentionally un-batching these requests so as to spread them evenly across the poll interval
   2. Performing a target read-before-write to replace (potentially expensive?) write calls with read calls
   3. Waiting for previous requests to finish before initiating subsequent ones
   4. Exponentially backing off after failures
   
   @hudeqi In your environment, are you noticing the load on the source system from the ACL reads? Do you have more MM2s connected to the target cluster or the source cluster? I'm wondering if (2) would actually be helpful, or if reads and writes have approximately the same cost.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1258611895


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
         }));
     }
 
-    private void updateTopicAcls(List<AclBinding> bindings) {
-        log.trace("Syncing {} topic ACL bindings.", bindings.size());
-        targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
-            if (e != null) {
-                log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
-            }
-        }));
+    // Visible for testing
+    int updateTopicAcls(List<AclBinding> bindings) {
+        Set<AclBinding> addBindings = new HashSet<>(bindings);
+        addBindings.removeAll(knownTopicAclBindings);
+        int newBindCount = addBindings.size();
+        if (!addBindings.isEmpty()) {
+            log.info("Syncing new found {} topic ACL bindings.", newBindCount);
+            targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+                if (e != null) {
+                    log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
+                }
+            }));
+            knownTopicAclBindings = new HashSet<>(bindings);

Review Comment:
   Hmmm... won't this cause issues if the call above to `targetAdminClient::createAcls` fails?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -683,4 +686,35 @@ private Optional<ConfigValue> validateProperty(String name, Map<String, String>
         assertNotNull(result, "Connector should not have record null config value for '" + name + "' property");
         return Optional.of(result);
     }
+
+    @Test
+    public void testUpdateIncrementTopicAcls() {
+        Admin sourceAdmin = mock(Admin.class);
+        Admin targetAdmin = mock(Admin.class);
+        MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin);
+
+        List<AclBinding> filteredBindings = new ArrayList<>();
+        AclBinding binding1 = mock(AclBinding.class);
+        AclBinding binding2 = mock(AclBinding.class);
+        filteredBindings.add(binding1);
+        filteredBindings.add(binding2);
+        doReturn(mock(CreateAclsResult.class)).when(targetAdmin).createAcls(anySet());
+
+        // First topic acl info update when starting `syncTopicAcls` thread
+        int newAddCount = connector.updateTopicAcls(filteredBindings);
+        assertEquals(connector.knownTopicAclBindings(), new HashSet<>(filteredBindings));
+        assertTrue(newAddCount == filteredBindings.size());
+
+        List<AclBinding> newAddBindings = new ArrayList<>();
+        AclBinding binding3 = mock(AclBinding.class);
+        AclBinding binding4 = mock(AclBinding.class);
+        newAddBindings.add(binding3);
+        newAddBindings.add(binding4);
+        filteredBindings.addAll(newAddBindings);
+
+        // The next increment topic acl info update
+        newAddCount = connector.updateTopicAcls(filteredBindings);
+        assertEquals(connector.knownTopicAclBindings(), new HashSet<>(filteredBindings));
+        assertTrue(newAddCount == newAddBindings.size());

Review Comment:
   ```suggestion
           assertEquals(newAddBindings.size(), newAddCount);
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -683,4 +686,35 @@ private Optional<ConfigValue> validateProperty(String name, Map<String, String>
         assertNotNull(result, "Connector should not have record null config value for '" + name + "' property");
         return Optional.of(result);
     }
+
+    @Test
+    public void testUpdateIncrementTopicAcls() {
+        Admin sourceAdmin = mock(Admin.class);
+        Admin targetAdmin = mock(Admin.class);
+        MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin);
+
+        List<AclBinding> filteredBindings = new ArrayList<>();
+        AclBinding binding1 = mock(AclBinding.class);
+        AclBinding binding2 = mock(AclBinding.class);
+        filteredBindings.add(binding1);
+        filteredBindings.add(binding2);
+        doReturn(mock(CreateAclsResult.class)).when(targetAdmin).createAcls(anySet());
+
+        // First topic acl info update when starting `syncTopicAcls` thread
+        int newAddCount = connector.updateTopicAcls(filteredBindings);
+        assertEquals(connector.knownTopicAclBindings(), new HashSet<>(filteredBindings));
+        assertTrue(newAddCount == filteredBindings.size());

Review Comment:
   ```suggestion
           assertEquals(filteredBindings.size(), newAddCount);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector [kafka]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1842029086

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hudeqi commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "hudeqi (via GitHub)" <gi...@apache.org>.
hudeqi commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1248609883


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
     }
 
     private void updateTopicAcls(List<AclBinding> bindings) {
-        log.trace("Syncing {} topic ACL bindings.", bindings.size());
-        targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
-            if (e != null) {
-                log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
-            }
-        }));
+        Set<AclBinding> addBindings = new HashSet<>();
+        addBindings.addAll(bindings);
+        addBindings.removeAll(knownTopicAclBindings);

Review Comment:
   Thank you very much. I learned from this blog. 
   I have changed `knownTopicAclBindings` from `List` to `Set`. 
   I would like to ask a digression: which is your IDE? 
   What plug-in tool prompts slow performance? @C0urante 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1636074235

   @hudeqi Sorry, I think there's a misunderstanding here. I'm not claiming that MM2 would be incapable of detecting changes in source cluster ACLs with this change; I'm worried that it would be unable to detect (or really, just overwrite) changes in target cluster ACLs, if they were made by a separate user/process from the MM2 cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org