You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/06/19 20:48:09 UTC
[kafka] branch 2.0 updated: Kafka_7064 - bug introduced when
switching config commands to ConfigResource (#5245)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new f87df1c Kafka_7064 - bug introduced when switching config commands to ConfigResource (#5245)
f87df1c is described below
commit f87df1c7f91743f0813139e45c8f2a145ad91767
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Tue Jun 19 21:47:24 2018 +0100
Kafka_7064 - bug introduced when switching config commands to ConfigResource (#5245)
Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Jun Rao <ju...@gmail.com>
---
.../apache/kafka/common/config/ConfigResource.java | 2 +-
.../kafka/common/requests/DeleteAclsRequest.java | 2 +-
.../kafka/common/requests/DescribeAclsRequest.java | 4 +++-
.../kafka/common/config/ConfigResourceTest.java | 2 +-
.../common/requests/DeleteAclsRequestTest.java | 23 +++++++++++++++++----
.../common/requests/DescribeAclsRequestTest.java | 24 ++++++++++++++++++----
.../client/client_compatibility_features_test.py | 4 +++-
7 files changed, 48 insertions(+), 13 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
index d2ed4be..5343a6b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
@@ -33,7 +33,7 @@ public final class ConfigResource {
* Type of resource.
*/
public enum Type {
- BROKER((byte) 3), TOPIC((byte) 2), UNKNOWN((byte) 0);
+ BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
private static final Map<Byte, Type> TYPES = Collections.unmodifiableMap(
Arrays.stream(values()).collect(Collectors.toMap(Type::id, Function.identity()))
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 24b5dab..4c19a4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -158,7 +158,7 @@ public class DeleteAclsRequest extends AbstractRequest {
final boolean unsupported = filters.stream()
.map(AclBindingFilter::patternFilter)
.map(ResourcePatternFilter::patternType)
- .anyMatch(patternType -> patternType != PatternType.LITERAL);
+ .anyMatch(patternType -> patternType != PatternType.LITERAL && patternType != PatternType.ANY);
if (unsupported) {
throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index acee3d9..d219839 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -130,7 +130,9 @@ public class DescribeAclsRequest extends AbstractRequest {
}
private void validate(AclBindingFilter filter, short version) {
- if (version == 0 && filter.patternFilter().patternType() != PatternType.LITERAL) {
+ if (version == 0
+ && filter.patternFilter().patternType() != PatternType.LITERAL
+ && filter.patternFilter().patternType() != PatternType.ANY) {
throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java
index 6324f0e..73effee 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigResourceTest.java
@@ -27,7 +27,7 @@ public class ConfigResourceTest {
@Test
public void shouldGetTypeFromId() {
assertEquals(ConfigResource.Type.TOPIC, ConfigResource.Type.forId((byte) 2));
- assertEquals(ConfigResource.Type.BROKER, ConfigResource.Type.forId((byte) 3));
+ assertEquals(ConfigResource.Type.BROKER, ConfigResource.Type.forId((byte) 4));
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
index 4311813..9be8d59 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.Test;
@@ -43,14 +43,14 @@ public class DeleteAclsRequestTest {
private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
- private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
+ private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "bar", PatternType.ANY),
new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "prefix", PatternType.PREFIXED),
new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
@Test(expected = UnsupportedVersionException.class)
- public void shouldThrowOnV0IfNotLiteral() {
+ public void shouldThrowOnV0IfPrefixed() {
new DeleteAclsRequest(V0, aclFilters(PREFIXED_FILTER));
}
@@ -60,7 +60,7 @@ public class DeleteAclsRequestTest {
}
@Test
- public void shouldRoundTripV0() {
+ public void shouldRoundTripLiteralV0() {
final DeleteAclsRequest original = new DeleteAclsRequest(V0, aclFilters(LITERAL_FILTER));
final Struct struct = original.toStruct();
@@ -70,6 +70,21 @@ public class DeleteAclsRequestTest {
}
@Test
+ public void shouldRoundTripAnyV0AsLiteral() {
+ final DeleteAclsRequest original = new DeleteAclsRequest(V0, aclFilters(ANY_FILTER));
+ final DeleteAclsRequest expected = new DeleteAclsRequest(V0, aclFilters(
+ new AclBindingFilter(new ResourcePatternFilter(
+ ANY_FILTER.patternFilter().resourceType(),
+ ANY_FILTER.patternFilter().name(),
+ PatternType.LITERAL),
+ ANY_FILTER.entryFilter())));
+
+ final DeleteAclsRequest result = new DeleteAclsRequest(original.toStruct(), V0);
+
+ assertRequestEquals(expected, result);
+ }
+
+ @Test
public void shouldRoundTripV1() {
final DeleteAclsRequest original = new DeleteAclsRequest(V1, aclFilters(LITERAL_FILTER, PREFIXED_FILTER, ANY_FILTER));
final Struct struct = original.toStruct();
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
index d7c6593..7d9d1b1 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.Test;
@@ -40,14 +40,14 @@ public class DescribeAclsRequestTest {
private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
- private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
+ private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "bar", PatternType.ANY),
new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "foo", PatternType.LITERAL),
new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
@Test(expected = UnsupportedVersionException.class)
- public void shouldThrowOnV0IfNotLiteral() {
+ public void shouldThrowOnV0IfPrefixed() {
new DescribeAclsRequest(PREFIXED_FILTER, V0);
}
@@ -57,7 +57,7 @@ public class DescribeAclsRequestTest {
}
@Test
- public void shouldRoundTripV0() {
+ public void shouldRoundTripLiteralV0() {
final DescribeAclsRequest original = new DescribeAclsRequest(LITERAL_FILTER, V0);
final Struct struct = original.toStruct();
@@ -67,6 +67,22 @@ public class DescribeAclsRequestTest {
}
@Test
+ public void shouldRoundTripAnyV0AsLiteral() {
+ final DescribeAclsRequest original = new DescribeAclsRequest(ANY_FILTER, V0);
+ final DescribeAclsRequest expected = new DescribeAclsRequest(
+ new AclBindingFilter(new ResourcePatternFilter(
+ ANY_FILTER.patternFilter().resourceType(),
+ ANY_FILTER.patternFilter().name(),
+ PatternType.LITERAL),
+ ANY_FILTER.entryFilter()), V0);
+
+ final Struct struct = original.toStruct();
+ final DescribeAclsRequest result = new DescribeAclsRequest(struct, V0);
+
+ assertRequestEquals(expected, result);
+ }
+
+ @Test
public void shouldRoundTripLiteralV1() {
final DescribeAclsRequest original = new DescribeAclsRequest(LITERAL_FILTER, V1);
final Struct struct = original.toStruct();
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index c5c2f2d..d386578 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -23,7 +23,7 @@ from ducktape.tests.test import TestContext
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from ducktape.tests.test import Test
-from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, V_0_11_0_0, V_0_10_1_0, KafkaVersion
+from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, V_0_11_0_0, V_0_10_1_0, KafkaVersion
def get_broker_features(broker_version):
features = {}
@@ -102,6 +102,8 @@ class ClientCompatibilityFeaturesTest(Test):
@parametrize(broker_version=str(LATEST_0_10_1))
@parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_11_0))
+ @parametrize(broker_version=str(LATEST_1_0))
+ @parametrize(broker_version=str(LATEST_1_1))
def run_compatibility_test(self, broker_version):
self.zk.start()
self.kafka.set_version(KafkaVersion(broker_version))