You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/06/19 20:47:28 UTC

[kafka] branch trunk updated: Kafka_7064 - bug introduced when switching config commands to ConfigResource (#5245)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d2b84bd  Kafka_7064 - bug introduced when switching config commands to ConfigResource  (#5245)
d2b84bd is described below

commit d2b84bd4b03d528ff82c8fb6067f118ca9256792
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Tue Jun 19 21:47:24 2018 +0100

    Kafka_7064 - bug introduced when switching config commands to ConfigResource  (#5245)
    
    Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Jun Rao <ju...@gmail.com>
---
 .../apache/kafka/common/config/ConfigResource.java |  2 +-
 .../kafka/common/requests/DeleteAclsRequest.java   |  2 +-
 .../kafka/common/requests/DescribeAclsRequest.java |  4 +++-
 .../kafka/common/config/ConfigResourceTest.java    |  2 +-
 .../common/requests/DeleteAclsRequestTest.java     | 23 +++++++++++++++++----
 .../common/requests/DescribeAclsRequestTest.java   | 24 ++++++++++++++++++----
 .../client/client_compatibility_features_test.py   |  4 +++-
 7 files changed, 48 insertions(+), 13 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
index d2ed4be..5343a6b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
@@ -33,7 +33,7 @@ public final class ConfigResource {
      * Type of resource.
      */
     public enum Type {
-        BROKER((byte) 3), TOPIC((byte) 2), UNKNOWN((byte) 0);
+        BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
 
         private static final Map<Byte, Type> TYPES = Collections.unmodifiableMap(
             Arrays.stream(values()).collect(Collectors.toMap(Type::id, Function.identity()))
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 24b5dab..4c19a4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -158,7 +158,7 @@ public class DeleteAclsRequest extends AbstractRequest {
             final boolean unsupported = filters.stream()
                 .map(AclBindingFilter::patternFilter)
                 .map(ResourcePatternFilter::patternType)
-                .anyMatch(patternType -> patternType != PatternType.LITERAL);
+                .anyMatch(patternType -> patternType != PatternType.LITERAL && patternType != PatternType.ANY);
             if (unsupported) {
                 throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index acee3d9..d219839 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -130,7 +130,9 @@ public class DescribeAclsRequest extends AbstractRequest {
     }
 
     private void validate(AclBindingFilter filter, short version) {
-        if (version == 0 && filter.patternFilter().patternType() != PatternType.LITERAL) {
+        if (version == 0
+            && filter.patternFilter().patternType() != PatternType.LITERAL
+            && filter.patternFilter().patternType() != PatternType.ANY) {
             throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
         }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java
index 6324f0e..73effee 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java
@@ -27,7 +27,7 @@ public class ConfigResourceTest {
     @Test
     public void shouldGetTypeFromId() {
         assertEquals(ConfigResource.Type.TOPIC, ConfigResource.Type.forId((byte) 2));
-        assertEquals(ConfigResource.Type.BROKER, ConfigResource.Type.forId((byte) 3));
+        assertEquals(ConfigResource.Type.BROKER, ConfigResource.Type.forId((byte) 4));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
index 4311813..9be8d59 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
@@ -43,14 +43,14 @@ public class DeleteAclsRequestTest {
     private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
+    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "bar", PatternType.ANY),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
     private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "prefix", PatternType.PREFIXED),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
     @Test(expected = UnsupportedVersionException.class)
-    public void shouldThrowOnV0IfNotLiteral() {
+    public void shouldThrowOnV0IfPrefixed() {
         new DeleteAclsRequest(V0, aclFilters(PREFIXED_FILTER));
     }
 
@@ -60,7 +60,7 @@ public class DeleteAclsRequestTest {
     }
 
     @Test
-    public void shouldRoundTripV0() {
+    public void shouldRoundTripLiteralV0() {
         final DeleteAclsRequest original = new DeleteAclsRequest(V0, aclFilters(LITERAL_FILTER));
         final Struct struct = original.toStruct();
 
@@ -70,6 +70,21 @@ public class DeleteAclsRequestTest {
     }
 
     @Test
+    public void shouldRoundTripAnyV0AsLiteral() {
+        final DeleteAclsRequest original = new DeleteAclsRequest(V0, aclFilters(ANY_FILTER));
+        final DeleteAclsRequest expected = new DeleteAclsRequest(V0, aclFilters(
+            new AclBindingFilter(new ResourcePatternFilter(
+                ANY_FILTER.patternFilter().resourceType(),
+                ANY_FILTER.patternFilter().name(),
+                PatternType.LITERAL),
+                ANY_FILTER.entryFilter())));
+
+        final DeleteAclsRequest result = new DeleteAclsRequest(original.toStruct(), V0);
+
+        assertRequestEquals(expected, result);
+    }
+
+    @Test
     public void shouldRoundTripV1() {
         final DeleteAclsRequest original = new DeleteAclsRequest(V1, aclFilters(LITERAL_FILTER, PREFIXED_FILTER, ANY_FILTER));
         final Struct struct = original.toStruct();
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
index d7c6593..7d9d1b1 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
@@ -40,14 +40,14 @@ public class DescribeAclsRequestTest {
     private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
+    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "bar", PatternType.ANY),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
     private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "foo", PatternType.LITERAL),
         new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
     @Test(expected = UnsupportedVersionException.class)
-    public void shouldThrowOnV0IfNotLiteral() {
+    public void shouldThrowOnV0IfPrefixed() {
         new DescribeAclsRequest(PREFIXED_FILTER, V0);
     }
 
@@ -57,7 +57,7 @@ public class DescribeAclsRequestTest {
     }
 
     @Test
-    public void shouldRoundTripV0() {
+    public void shouldRoundTripLiteralV0() {
         final DescribeAclsRequest original = new DescribeAclsRequest(LITERAL_FILTER, V0);
         final Struct struct = original.toStruct();
 
@@ -67,6 +67,22 @@ public class DescribeAclsRequestTest {
     }
 
     @Test
+    public void shouldRoundTripAnyV0AsLiteral() {
+        final DescribeAclsRequest original = new DescribeAclsRequest(ANY_FILTER, V0);
+        final DescribeAclsRequest expected = new DescribeAclsRequest(
+            new AclBindingFilter(new ResourcePatternFilter(
+                ANY_FILTER.patternFilter().resourceType(),
+                ANY_FILTER.patternFilter().name(),
+                PatternType.LITERAL),
+                ANY_FILTER.entryFilter()), V0);
+
+        final Struct struct = original.toStruct();
+        final DescribeAclsRequest result = new DescribeAclsRequest(struct, V0);
+
+        assertRequestEquals(expected, result);
+    }
+
+    @Test
     public void shouldRoundTripLiteralV1() {
         final DescribeAclsRequest original = new DescribeAclsRequest(LITERAL_FILTER, V1);
         final Struct struct = original.toStruct();
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index c5c2f2d..d386578 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -23,7 +23,7 @@ from ducktape.tests.test import TestContext
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from ducktape.tests.test import Test
-from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, V_0_11_0_0, V_0_10_1_0, KafkaVersion
+from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, V_0_11_0_0, V_0_10_1_0, KafkaVersion
 
 def get_broker_features(broker_version):
     features = {}
@@ -102,6 +102,8 @@ class ClientCompatibilityFeaturesTest(Test):
     @parametrize(broker_version=str(LATEST_0_10_1))
     @parametrize(broker_version=str(LATEST_0_10_2))
     @parametrize(broker_version=str(LATEST_0_11_0))
+    @parametrize(broker_version=str(LATEST_1_0))
+    @parametrize(broker_version=str(LATEST_1_1))
     def run_compatibility_test(self, broker_version):
         self.zk.start()
         self.kafka.set_version(KafkaVersion(broker_version))