You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/13 22:10:47 UTC

[GitHub] [kafka] cmccabe opened a new pull request, #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

cmccabe opened a new pull request, #12636:
URL: https://github.com/apache/kafka/pull/12636

   Convert StandardAuthorizer to use a copy-on-write sorted array rather than a ConcurrentSkipList. The issue with the skiplist was that because it was modified while in use by StandardAuthorizer#authorize, we could sometimes expose an inconsistent state. For example, if we added a "deny principal foo", followed by "allow all", a request for principal foo might happen to see the second one, without seeing the first one, even though the first one was added first.
   
   The sorted array data structure is more compact than the tree, although it has the disadvantage of needing to be copied every time it's modified. On the broker side, we can apply a batch of updates atomically now. On the controller side, we still apply updates one-by-one, for now.
   
   This PR renames AclAuthorizerBenchmark to AuthorizerBenchmark and extends it to report information about StandardAuthorizer as well as AclAuthorizer.
   
   Co-authored-by: Akhilesh Chaganti <ac...@confluent.io>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971108745


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
     }
 
     @Override
-    public void addAcl(Uuid id, StandardAcl acl) {
-        data.addAcl(id, acl);
-    }
-
-    @Override
-    public void removeAcl(Uuid id) {
-        data.removeAcl(id);
+    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        data = data.copyWithAllNewAcls(acls.entrySet());
     }
 
     @Override
-    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
-        data = data.copyWithNewAcls(acls.entrySet());
+    public synchronized void applyAclChanges(
+        Collection<Map.Entry<Uuid, StandardAcl>> newAcls,

Review Comment:
   Oh, i must have been looking at StandardAclWithId :), this looks good as-is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r972004724


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -138,6 +139,15 @@ public List<AuthorizationResult> authorize(
         return results;
     }

Review Comment:
   It also ensures a consistent snapshot is used for the duration of the method, right? Otherwise, one could get an updated version of the field for some of the authorize calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya closed pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya closed pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
URL: https://github.com/apache/kafka/pull/12636


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971092861


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
     private final DefaultRule defaultRule;
 
     /**
-     * Contains all of the current ACLs sorted by (resource type, resource name).
+     * An immutable array of all the current ACLs sorted by (resource type, resource name).
      */
-    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;

Review Comment:
   TreeSets are quite slow because they have poor memory locality. They also use a lot more memory. Ideally we could use something like a BTree, but Java doesn't have those, unfortunately...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971319874


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -138,6 +139,15 @@ public List<AuthorizationResult> authorize(
         return results;
     }
 
+    @Override
+    public AuthorizationResult authorizeByResourceType(

Review Comment:
   mainly I added this because the benchmarks looked really bad without it 😬 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r970977191


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -397,18 +387,34 @@ private MatchingAclRule findAclRule(
         return matchingAclBuilder.build();
     }
 
+    /**
+     * Use a binary search to find the index of the first ACL which is greater than or
+     * equal to the given ACL. This may be equal to the end of the array if there are
+     * no such ACLs.
+     */
+    private int indexOfFirstAclGreaterThanOrEqualTo(StandardAcl exemplar) {
+        int i = Arrays.binarySearch(acls,
+                new StandardAclWithId(Uuid.ZERO_UUID, exemplar),
+                StandardAclWithId.ACL_COMPARATOR);
+        // Arrays.binarySearch returns a positive number if it found an exact match, and
+        // a negative number otherwise.

Review Comment:
   Might comment about what the negative return value indicates. It helps L404 make more sense :)



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
     private final DefaultRule defaultRule;
 
     /**
-     * Contains all of the current ACLs sorted by (resource type, resource name).
+     * An immutable array of all the current ACLs sorted by (resource type, resource name).
      */
-    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;

Review Comment:
   I see we are replacing the skip-list set with a sorted array. Don't we need to guard against duplicates in the array?
   
   If we used a TreeSet here, it would be closer to the current implementation and I think it should have linear time when copying from another TreeSet



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
     }
 
     @Override
-    public void addAcl(Uuid id, StandardAcl acl) {
-        data.addAcl(id, acl);
-    }
-
-    @Override
-    public void removeAcl(Uuid id) {
-        data.removeAcl(id);
+    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        data = data.copyWithAllNewAcls(acls.entrySet());
     }
 
     @Override
-    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
-        data = data.copyWithNewAcls(acls.entrySet());
+    public synchronized void applyAclChanges(
+        Collection<Map.Entry<Uuid, StandardAcl>> newAcls,

Review Comment:
   Could we just take a `Collection<StandardAcl>` since the acl has the ID as a property?



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
     private final DefaultRule defaultRule;
 
     /**
-     * Contains all of the current ACLs sorted by (resource type, resource name).
+     * An immutable array of all the current ACLs sorted by (resource type, resource name).
      */
-    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;
-
-    /**
-     * Contains all of the current ACLs indexed by UUID.
-     */
-    private final ConcurrentHashMap<Uuid, StandardAcl> aclsById;

Review Comment:
   Guess we don't need this anymore if we are removing `addAcl` and `removeAcl`?



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +173,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + id +
-                    " to aclsByResource");
-            }
-            log.trace("Added ACL {}: {}", id, acl);
-        } catch (Throwable e) {
-            log.error("addAcl error", e);
-            throw e;
-        }
+    StandardAuthorizerData copyWithAclChanges(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        return copyWithNewAcls(acls, newAclEntries, removedAclIds);
     }
 
-    void removeAcl(Uuid id) {
-        try {
-            StandardAcl acl = aclsById.remove(id);
-            if (acl == null) {
-                throw new RuntimeException("ID " + id + " not found in aclsById.");
+    StandardAuthorizerData copyWithNewAcls(
+        StandardAclWithId[] existingAcls,
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        StandardAclWithId[] newAcls = new StandardAclWithId[
+                existingAcls.length + newAclEntries.size() - removedAclIds.size()];

Review Comment:
   nit: pull the size into a local so we don't break the line on the array brackets



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] akhileshchg commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
akhileshchg commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971088491


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
     private final DefaultRule defaultRule;
 
     /**
-     * Contains all of the current ACLs sorted by (resource type, resource name).
+     * An immutable array of all the current ACLs sorted by (resource type, resource name).
      */
-    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;

Review Comment:
   We're storing `Id` with the `StandardAcl`. Shouldn't that make it unique? I think since it is sorted, we can maybe have a conservative check to make sure there are no duplicate ids. 



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
     }
 
     @Override
-    public void addAcl(Uuid id, StandardAcl acl) {
-        data.addAcl(id, acl);
-    }
-
-    @Override
-    public void removeAcl(Uuid id) {
-        data.removeAcl(id);
+    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        data = data.copyWithAllNewAcls(acls.entrySet());
     }
 
     @Override
-    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
-        data = data.copyWithNewAcls(acls.entrySet());
+    public synchronized void applyAclChanges(
+        Collection<Map.Entry<Uuid, StandardAcl>> newAcls,

Review Comment:
   I don't think `StandardAcl` has id with it. We have a different data structure for it`StandardAclWithId`



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -138,6 +139,15 @@ public List<AuthorizationResult> authorize(
         return results;
     }

Review Comment:
   In `authorize` function we still do `StandardAuthorizerData curData = data;`. I don't think we need to do this anymore.



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + id +
-                    " to aclsByResource");
-            }
-            log.trace("Added ACL {}: {}", id, acl);
-        } catch (Throwable e) {
-            log.error("addAcl error", e);
-            throw e;
-        }
+    StandardAuthorizerData copyWithAclChanges(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        return copyWithNewAcls(acls, newAclEntries, removedAclIds);
     }
 
-    void removeAcl(Uuid id) {
-        try {
-            StandardAcl acl = aclsById.remove(id);
-            if (acl == null) {
-                throw new RuntimeException("ID " + id + " not found in aclsById.");
+    StandardAuthorizerData copyWithNewAcls(
+        StandardAclWithId[] existingAcls,
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        StandardAclWithId[] newAcls = new StandardAclWithId[
+                existingAcls.length + newAclEntries.size() - removedAclIds.size()];
+        int numRemoved = 0, j = 0;
+        for (int i = 0; i < existingAcls.length; i++) {
+            StandardAclWithId aclWithId = existingAcls[i];
+            if (removedAclIds.contains(aclWithId.id())) {
+                numRemoved++;
+            } else {
+                newAcls[j++] = aclWithId;
             }
-            if (!aclsByResource.remove(acl)) {
-                throw new RuntimeException("Unable to remove the ACL with ID " + id +
-                    " from aclsByResource");
+        }
+        if (numRemoved < removedAclIds.size()) {
+            throw new RuntimeException("Only located " + numRemoved + " out of " +
+                removedAclIds.size() + " removed ACL ID(s). removedAclIds = " +
+                removedAclIds.stream().map(a -> a.toString()).collect(Collectors.joining(", ")));
+        }
+        if (!newAclEntries.isEmpty()) {
+            int i = 0;
+            for (Entry<Uuid, StandardAcl> entry : newAclEntries) {
+                newAcls[existingAcls.length + i] = new StandardAclWithId(entry.getKey(), entry.getValue());
+                i++;
             }

Review Comment:
   Compared to the previous code, we're missing the check for duplicate ids. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971091182


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
     }
 
     @Override
-    public void addAcl(Uuid id, StandardAcl acl) {
-        data.addAcl(id, acl);
-    }
-
-    @Override
-    public void removeAcl(Uuid id) {
-        data.removeAcl(id);
+    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        data = data.copyWithAllNewAcls(acls.entrySet());
     }
 
     @Override
-    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
-        data = data.copyWithNewAcls(acls.entrySet());
+    public synchronized void applyAclChanges(
+        Collection<Map.Entry<Uuid, StandardAcl>> newAcls,

Review Comment:
   StandardAcl doesn't contain an ID



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971999299


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
     private final DefaultRule defaultRule;
 
     /**
-     * Contains all of the current ACLs sorted by (resource type, resource name).
+     * An immutable array of all the current ACLs sorted by (resource type, resource name).
      */
-    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;

Review Comment:
   The downside of the current implementation is that updates are pretty expensive, right? Can we file a JIRA to investigate this further post 3.3?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971095900


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +173,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + id +
-                    " to aclsByResource");
-            }
-            log.trace("Added ACL {}: {}", id, acl);
-        } catch (Throwable e) {
-            log.error("addAcl error", e);
-            throw e;
-        }
+    StandardAuthorizerData copyWithAclChanges(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        return copyWithNewAcls(acls, newAclEntries, removedAclIds);
     }
 
-    void removeAcl(Uuid id) {
-        try {
-            StandardAcl acl = aclsById.remove(id);
-            if (acl == null) {
-                throw new RuntimeException("ID " + id + " not found in aclsById.");
+    StandardAuthorizerData copyWithNewAcls(
+        StandardAclWithId[] existingAcls,
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        StandardAclWithId[] newAcls = new StandardAclWithId[
+                existingAcls.length + newAclEntries.size() - removedAclIds.size()];

Review Comment:
   good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971280346


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + id +
-                    " to aclsByResource");
-            }
-            log.trace("Added ACL {}: {}", id, acl);
-        } catch (Throwable e) {
-            log.error("addAcl error", e);
-            throw e;
-        }
+    StandardAuthorizerData copyWithAclChanges(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        return copyWithNewAcls(acls, newAclEntries, removedAclIds);
     }
 
-    void removeAcl(Uuid id) {
-        try {
-            StandardAcl acl = aclsById.remove(id);
-            if (acl == null) {
-                throw new RuntimeException("ID " + id + " not found in aclsById.");
+    StandardAuthorizerData copyWithNewAcls(
+        StandardAclWithId[] existingAcls,
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        StandardAclWithId[] newAcls = new StandardAclWithId[
+                existingAcls.length + newAclEntries.size() - removedAclIds.size()];
+        int numRemoved = 0, j = 0;
+        for (int i = 0; i < existingAcls.length; i++) {
+            StandardAclWithId aclWithId = existingAcls[i];
+            if (removedAclIds.contains(aclWithId.id())) {
+                numRemoved++;
+            } else {
+                newAcls[j++] = aclWithId;
             }
-            if (!aclsByResource.remove(acl)) {
-                throw new RuntimeException("Unable to remove the ACL with ID " + id +
-                    " from aclsByResource");
+        }
+        if (numRemoved < removedAclIds.size()) {
+            throw new RuntimeException("Only located " + numRemoved + " out of " +
+                removedAclIds.size() + " removed ACL ID(s). removedAclIds = " +
+                removedAclIds.stream().map(a -> a.toString()).collect(Collectors.joining(", ")));
+        }
+        if (!newAclEntries.isEmpty()) {
+            int i = 0;
+            for (Entry<Uuid, StandardAcl> entry : newAclEntries) {
+                newAcls[existingAcls.length + i] = new StandardAclWithId(entry.getKey(), entry.getValue());
+                i++;
             }

Review Comment:
   Duplicate IDs should not happen unless there is a bug. I do wish we could check for it here, but it would be very inefficient to do so, since we'd have to scan the whole array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971306933


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -138,6 +139,15 @@ public List<AuthorizationResult> authorize(
         return results;
     }
 
+    @Override
+    public AuthorizationResult authorizeByResourceType(

Review Comment:
   If I understand correctly, this implementation was added to take advantage of the new binary search approach in the ACL array. IOW, an optimization over the default `authorizeByResourceType` impl?



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java:
##########
@@ -76,14 +78,15 @@ public interface ClusterMetadataAuthorizer extends Authorizer {
     void loadSnapshot(Map<Uuid, StandardAcl> acls);
 
     /**
-     * Add a new ACL. Any ACL with the same ID will be replaced.
-     */
-    void addAcl(Uuid id, StandardAcl acl);
-
-    /**
-     * Remove the ACL with the given ID.
+     * Add or remove ACLs.
+     *
+     * @param newAcls           The ACLs to add.
+     * @param removedAclIds     The ACL IDs to remove.
      */
-    void removeAcl(Uuid id);
+    void applyAclChanges(

Review Comment:
   We should document that this method does not expect duplicates or allow replacing ACL by ID. 



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + id +
-                    " to aclsByResource");
-            }
-            log.trace("Added ACL {}: {}", id, acl);
-        } catch (Throwable e) {
-            log.error("addAcl error", e);
-            throw e;
-        }
+    StandardAuthorizerData copyWithAclChanges(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        return copyWithNewAcls(acls, newAclEntries, removedAclIds);
     }
 
-    void removeAcl(Uuid id) {
-        try {
-            StandardAcl acl = aclsById.remove(id);
-            if (acl == null) {
-                throw new RuntimeException("ID " + id + " not found in aclsById.");
+    StandardAuthorizerData copyWithNewAcls(
+        StandardAclWithId[] existingAcls,
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries,
+        Set<Uuid> removedAclIds
+    ) {
+        int newSize = existingAcls.length + newAclEntries.size() - removedAclIds.size();
+        StandardAclWithId[] newAcls = new StandardAclWithId[newSize];
+        int numRemoved = 0, j = 0;
+        for (int i = 0; i < existingAcls.length; i++) {
+            StandardAclWithId aclWithId = existingAcls[i];
+            if (removedAclIds.contains(aclWithId.id())) {
+                numRemoved++;
+            } else {
+                newAcls[j++] = aclWithId;
             }
-            if (!aclsByResource.remove(acl)) {
-                throw new RuntimeException("Unable to remove the ACL with ID " + id +
-                    " from aclsByResource");
+        }
+        if (numRemoved < removedAclIds.size()) {
+            throw new RuntimeException("Only located " + numRemoved + " out of " +
+                removedAclIds.size() + " removed ACL ID(s). removedAclIds = " +
+                removedAclIds.stream().map(a -> a.toString()).collect(Collectors.joining(", ")));
+        }
+        if (!newAclEntries.isEmpty()) {
+            int i = 0;
+            for (Entry<Uuid, StandardAcl> entry : newAclEntries) {
+                newAcls[existingAcls.length + i] = new StandardAclWithId(entry.getKey(), entry.getValue());

Review Comment:
   Should this index be offset by the number we removed?



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + id +
-                    " to aclsByResource");
-            }

Review Comment:
   Got it. Looking at AclControlManager, we should not generate a record for an existing ACL (as determined by the hash set in AclControlManager). 
   
   The replace-by-ID case via `ClusterMetadataAuthorizer#addAcl` confused me since I don't see how a caller could pass in the same ID for a new ACL. The doc used to say "Add a new ACL. Any ACL with the same ID will be replaced." Anyways, it's gone now with this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971280735


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + id +
-                    " to aclsByResource");
-            }

Review Comment:
   Like I said above, we'd have to scan the whole array since we don't have a map from id -> acl any more. I don't think it's worth it just for a sanity check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on PR #12636:
URL: https://github.com/apache/kafka/pull/12636#issuecomment-1247070076

   Jenkins went down again. :(
   
   ```
   java.nio.file.FileSystemException: /home/jenkins/workspace/Kafka_kafka-pr_PR-12636: No space left on device
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971116340


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
             loadingComplete,
             newSuperUsers,
             newDefaultResult,
-            aclsByResource,
-            aclsById);
+            acls);
     }
 
-    StandardAuthorizerData copyWithNewAcls(Collection<Entry<Uuid, StandardAcl>> aclEntries) {
-        StandardAuthorizerData newData = new StandardAuthorizerData(
-            log,
-            aclMutator,
-            loadingComplete,
-            superUsers,
-            defaultRule.result,
-            new ConcurrentSkipListSet<>(),
-            new ConcurrentHashMap<>());
-        for (Entry<Uuid, StandardAcl> entry : aclEntries) {
-            newData.addAcl(entry.getKey(), entry.getValue());
-        }
-        log.info("Applied {} acl(s) from image.", aclEntries.size());
-        return newData;
+    StandardAuthorizerData copyWithAllNewAcls(
+        Collection<Entry<Uuid, StandardAcl>> newAclEntries
+    ) {
+        return copyWithNewAcls(EMPTY_ACLS, newAclEntries, Collections.emptySet());
     }
 
-    void addAcl(Uuid id, StandardAcl acl) {
-        try {
-            StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-            if (prevAcl != null) {
-                throw new RuntimeException("An ACL with ID " + id + " already exists.");
-            }
-            if (!aclsByResource.add(acl)) {
-                aclsById.remove(id);
-                throw new RuntimeException("Unable to add the ACL with ID " + id +
-                    " to aclsByResource");
-            }

Review Comment:
   I think we lost this existing ID and duplicate ACL check in the new array code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on PR #12636:
URL: https://github.com/apache/kafka/pull/12636#issuecomment-1251532201

   I uploaded a new version at https://github.com/apache/kafka/pull/12662
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971999299


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
     private final DefaultRule defaultRule;
 
     /**
-     * Contains all of the current ACLs sorted by (resource type, resource name).
+     * An immutable array of all the current ACLs sorted by (resource type, resource name).
      */
-    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;

Review Comment:
   The downside of the current implementation is that updates can be pretty expensive, right? Can we file a JIRA to investigate this further post 3.3?



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
     private final DefaultRule defaultRule;
 
     /**
-     * Contains all of the current ACLs sorted by (resource type, resource name).
+     * An immutable array of all the current ACLs sorted by (resource type, resource name).
      */
-    private final ConcurrentSkipListSet<StandardAcl> aclsByResource;

Review Comment:
   The downside of the current implementation is that updates can be pretty expensive, right? Can we file a JIRA to investigate this further post 3.3.0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12636:
URL: https://github.com/apache/kafka/pull/12636#issuecomment-1248110491

   Thanks for the PR. @cmccabe can you please include the benchmark results before/after this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971094152


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
     }
 
     @Override
-    public void addAcl(Uuid id, StandardAcl acl) {
-        data.addAcl(id, acl);
-    }
-
-    @Override
-    public void removeAcl(Uuid id) {
-        data.removeAcl(id);
+    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
+        data = data.copyWithAllNewAcls(acls.entrySet());
     }
 
     @Override
-    public synchronized void loadSnapshot(Map<Uuid, StandardAcl> acls) {
-        data = data.copyWithNewAcls(acls.entrySet());
+    public synchronized void applyAclChanges(
+        Collection<Map.Entry<Uuid, StandardAcl>> newAcls,

Review Comment:
   However, we could use StandardAclWithId here. That would require another copy in the case where we were loading a snapshot, so I'm not sure if it's worth it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971279173


##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -138,6 +139,15 @@ public List<AuthorizationResult> authorize(
         return results;
     }

Review Comment:
   The purpose of doing this is to avoid loading the volatile multiple times. Each time we load a volatile, it is expensive because it requires an interlocked instruction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #12636:
URL: https://github.com/apache/kafka/pull/12636#issuecomment-1594725046

   > I uploaded a new version at https://github.com/apache/kafka/pull/12662
   
   In that case, closing this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org