You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/06/14 20:49:10 UTC

[kafka] branch 2.0 updated: KAFKA-7010: Rename ResourceNameType to PatternType (#5205)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 24d6036  KAFKA-7010: Rename ResourceNameType to PatternType (#5205)
24d6036 is described below

commit 24d603665a3a2219b040f3213c4b0b09e48568cc
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Thu Jun 14 21:47:59 2018 +0100

    KAFKA-7010: Rename ResourceNameType to PatternType (#5205)
    
    The initial PR for KIP-290 #5117 added a new `ResourceNameType`, which was initially a field on `Resource` and `ResourceFilter`. However, follow on PRs have now moved the name type fields to new `ResourcePattern` and `ResourcePatternFilter` classes. This means the old name is no longer valid and may be confusing. The PR looks to rename the class to a more intuitive `resource.PatternType`.
    
    @cmccabe also requested that the current `ANY` value for this class be renamed to avoid confusion. `PatternType.ANY` currently causes `ResourcePatternFilter` to bring back all ACLs that would affect the supplied resource, i.e. it brings back literal, wildcard ACLs, and also does pattern matching to work out which prefix acls would affect the resource.  This is very different from the behaviour of `ResourceType.ANY`, which just means the filter ignores the type of resources.
    
     `ANY` is to be renamed to `MATCH` to disambiguate it from other `ANY` filter types. A new `ANY` will be added that works in the same way as others, i.e. it will cause the filter to ignore the pattern type, (but won't do any pattern matching).
    
    Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Jun Rao <ju...@gmail.com>
---
 .../kafka/clients/consumer/RoundRobinAssignor.java |  2 +-
 .../org/apache/kafka/common/acl/AclBinding.java    |  4 +-
 .../apache/kafka/common/acl/AclBindingFilter.java  |  4 +-
 .../apache/kafka/common/protocol/CommonFields.java |  6 +-
 .../kafka/common/requests/CreateAclsRequest.java   | 18 +++---
 .../kafka/common/requests/DeleteAclsRequest.java   | 18 +++---
 .../kafka/common/requests/DeleteAclsResponse.java  | 20 +++----
 .../kafka/common/requests/DescribeAclsRequest.java | 16 ++---
 .../common/requests/DescribeAclsResponse.java      | 20 +++----
 .../apache/kafka/common/requests/RequestUtils.java | 22 +++----
 .../{ResourceNameType.java => PatternType.java}    | 56 ++++++++++++------
 .../kafka/common/resource/ResourcePattern.java     | 28 ++++-----
 .../common/resource/ResourcePatternFilter.java     | 47 +++++++--------
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 10 ++--
 .../apache/kafka/common/acl/AclBindingTest.java    | 31 +++++-----
 .../common/acl/ResourcePatternFilterTest.java      | 68 +++++++++++++---------
 .../kafka/common/acl/ResourcePatternTest.java      | 15 +++--
 .../common/requests/CreateAclsRequestTest.java     | 10 ++--
 .../common/requests/DeleteAclsRequestTest.java     | 10 ++--
 .../common/requests/DeleteAclsResponseTest.java    | 10 ++--
 .../common/requests/DescribeAclsRequestTest.java   | 10 ++--
 .../common/requests/DescribeAclsResponseTest.java  | 10 ++--
 .../kafka/common/requests/RequestResponseTest.java | 18 +++---
 core/src/main/scala/kafka/admin/AclCommand.scala   | 50 +++++++++-------
 .../common/ZkNodeChangeNotificationListener.scala  |  3 +-
 .../main/scala/kafka/security/SecurityUtils.scala  |  4 +-
 .../scala/kafka/security/auth/Authorizer.scala     | 18 +++---
 .../main/scala/kafka/security/auth/Resource.scala  | 38 ++++++------
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 20 +++----
 core/src/main/scala/kafka/server/KafkaApis.scala   |  6 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 20 +++----
 core/src/main/scala/kafka/zk/ZkData.scala          | 44 +++++++-------
 .../kafka/api/AdminClientIntegrationTest.scala     |  4 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  2 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |  4 +-
 .../api/SaslSslAdminClientIntegrationTest.scala    | 60 +++++++++----------
 .../scala/kafka/security/auth/ResourceTest.scala   |  2 +-
 .../test/scala/kafka/zk/ExtendedAclStoreTest.scala |  2 +-
 .../test/scala/kafka/zk/LiteralAclStoreTest.scala  | 16 ++++-
 .../scala/unit/kafka/admin/AclCommandTest.scala    |  4 +-
 .../ZkNodeChangeNotificationListenerTest.scala     |  2 +-
 .../security/auth/SimpleAclAuthorizerTest.scala    | 67 +++++++++++----------
 .../delegation/DelegationTokenManagerTest.scala    |  2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  7 +--
 docs/security.html                                 | 57 +++++++++---------
 45 files changed, 475 insertions(+), 410 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index 7e8d6f2..3b543f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -48,7 +48,7 @@ import java.util.TreeSet;
  * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0,
  * t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
  *
- * Tha assignment will be:
+ * That assignment will be:
  * C0: [t0p0]
  * C1: [t1p0]
  * C2: [t1p1, t2p0, t2p1, t2p2]
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
index feba875..67dbfc0 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
@@ -18,8 +18,8 @@
 package org.apache.kafka.common.acl;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.Resource;
-import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.resource.ResourcePattern;
 
 import java.util.Objects;
@@ -54,7 +54,7 @@ public class AclBinding {
      */
     @Deprecated
     public AclBinding(Resource resource, AccessControlEntry entry) {
-        this(new ResourcePattern(resource.resourceType(), resource.name(), ResourceNameType.LITERAL), entry);
+        this(new ResourcePattern(resource.resourceType(), resource.name(), PatternType.LITERAL), entry);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
index 9d13fa7..3168ec6 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -18,8 +18,8 @@
 package org.apache.kafka.common.acl;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourceFilter;
-import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 
 import java.util.Objects;
@@ -59,7 +59,7 @@ public class AclBindingFilter {
      */
     @Deprecated
     public AclBindingFilter(ResourceFilter resourceFilter, AccessControlEntryFilter entryFilter) {
-        this(new ResourcePatternFilter(resourceFilter.resourceType(), resourceFilter.name(), ResourceNameType.LITERAL), entryFilter);
+        this(new ResourcePatternFilter(resourceFilter.resourceType(), resourceFilter.name(), PatternType.LITERAL), entryFilter);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index 96fa136..9eddf2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 
 public class CommonFields {
     public static final Field.Int32 THROTTLE_TIME_MS = new Field.Int32("throttle_time_ms",
@@ -46,8 +46,8 @@ public class CommonFields {
     public static final Field.Int8 RESOURCE_TYPE = new Field.Int8("resource_type", "The resource type");
     public static final Field.Str RESOURCE_NAME = new Field.Str("resource_name", "The resource name");
     public static final Field.NullableStr RESOURCE_NAME_FILTER = new Field.NullableStr("resource_name", "The resource name filter");
-    public static final Field.Int8 RESOURCE_NAME_TYPE = new Field.Int8("resource_name_type", "The resource name type", ResourceNameType.LITERAL.code());
-    public static final Field.Int8 RESOURCE_NAME_TYPE_FILTER = new Field.Int8("resource_name_type_filter", "The resource name type filter", ResourceNameType.LITERAL.code());
+    public static final Field.Int8 RESOURCE_PATTERN_TYPE = new Field.Int8("resource_pattten_type", "The resource pattern type", PatternType.LITERAL.code());
+    public static final Field.Int8 RESOURCE_PATTERN_TYPE_FILTER = new Field.Int8("resource_pattern_type_filter", "The resource pattern type filter", PatternType.LITERAL.code());
     public static final Field.Str PRINCIPAL = new Field.Str("principal", "The ACL principal");
     public static final Field.NullableStr PRINCIPAL_FILTER = new Field.NullableStr("principal", "The ACL principal filter");
     public static final Field.Str HOST = new Field.Str("host", "The ACL host");
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index 6782f78..a77a373 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
@@ -38,7 +38,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class CreateAclsRequest extends AbstractRequest {
@@ -54,16 +54,16 @@ public class CreateAclsRequest extends AbstractRequest {
                     PERMISSION_TYPE))));
 
     /**
-     * Version 1 adds RESOURCE_NAME_TYPE.
-     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
+     * Version 1 adds RESOURCE_PATTERN_TYPE, to support more than just literal resource patterns.
+     * For more info, see {@link PatternType}.
      *
-     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
+     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
      */
     private static final Schema CREATE_ACLS_REQUEST_V1 = new Schema(
             new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
                     RESOURCE_TYPE,
                     RESOURCE_NAME,
-                    RESOURCE_NAME_TYPE,
+                    RESOURCE_PATTERN_TYPE,
                     PRINCIPAL,
                     HOST,
                     OPERATION,
@@ -180,10 +180,10 @@ public class CreateAclsRequest extends AbstractRequest {
             final boolean unsupported = aclCreations.stream()
                 .map(AclCreation::acl)
                 .map(AclBinding::pattern)
-                .map(ResourcePattern::nameType)
-                .anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
+                .map(ResourcePattern::patternType)
+                .anyMatch(patternType -> patternType != PatternType.LITERAL);
             if (unsupported) {
-                throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+                throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
             }
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 9bb15a3..24b5dab 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.utils.Utils;
 
@@ -39,7 +39,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class DeleteAclsRequest extends AbstractRequest {
@@ -55,16 +55,16 @@ public class DeleteAclsRequest extends AbstractRequest {
                     PERMISSION_TYPE))));
 
     /**
-     * V1 sees a new `RESOURCE_NAME_TYPE_FILTER` that controls how the filter handles different resource name types.
-     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
+     * V1 sees a new `RESOURCE_PATTERN_TYPE_FILTER` that controls how the filter handles different resource pattern types.
+     * For more info, see {@link PatternType}.
      *
-     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
+     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
      */
     private static final Schema DELETE_ACLS_REQUEST_V1 = new Schema(
             new Field(FILTERS, new ArrayOf(new Schema(
                     RESOURCE_TYPE,
                     RESOURCE_NAME_FILTER,
-                    RESOURCE_NAME_TYPE_FILTER,
+                    RESOURCE_PATTERN_TYPE_FILTER,
                     PRINCIPAL_FILTER,
                     HOST_FILTER,
                     OPERATION,
@@ -157,10 +157,10 @@ public class DeleteAclsRequest extends AbstractRequest {
         if (version == 0) {
             final boolean unsupported = filters.stream()
                 .map(AclBindingFilter::patternFilter)
-                .map(ResourcePatternFilter::nameType)
-                .anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
+                .map(ResourcePatternFilter::patternType)
+                .anyMatch(patternType -> patternType != PatternType.LITERAL);
             if (unsupported) {
-                throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+                throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
             }
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 112c6a3..a3b81cc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 
@@ -65,16 +65,16 @@ public class DeleteAclsResponse extends AbstractResponse {
             PERMISSION_TYPE);
 
     /**
-     * V1 sees a new `RESOURCE_NAME_TYPE` that describes how the resource name is interpreted.
+     * V1 sees a new `RESOURCE_PATTERN_TYPE` that defines the type of the resource pattern.
      *
-     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
+     * For more info, see {@link PatternType}.
      */
     private static final Schema MATCHING_ACL_V1 = new Schema(
             ERROR_CODE,
             ERROR_MESSAGE,
             RESOURCE_TYPE,
             RESOURCE_NAME,
-            RESOURCE_NAME_TYPE,
+            RESOURCE_PATTERN_TYPE,
             PRINCIPAL,
             HOST,
             OPERATION,
@@ -89,10 +89,10 @@ public class DeleteAclsResponse extends AbstractResponse {
                             new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL_V0), "The matching ACLs")))));
 
     /**
-     * V1 sees a new `RESOURCE_NAME_TYPE` field added to MATCHING_ACL_V1, that describes how the resource name is interpreted
+     * V1 sees a new `RESOURCE_PATTERN_TYPE` field added to MATCHING_ACL_V1, that describes how the resource pattern is interpreted
      * and version was bumped to indicate that, on quota violation, brokers send out responses before throttling.
      *
-     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
+     * For more info, see {@link PatternType}.
      */
     private static final Schema DELETE_ACLS_RESPONSE_V1 = new Schema(
             THROTTLE_TIME_MS,
@@ -248,10 +248,10 @@ public class DeleteAclsResponse extends AbstractResponse {
                 .flatMap(r -> r.deletions.stream())
                 .map(AclDeletionResult::acl)
                 .map(AclBinding::pattern)
-                .map(ResourcePattern::nameType)
-                .anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
+                .map(ResourcePattern::patternType)
+                .anyMatch(patternType -> patternType != PatternType.LITERAL);
             if (unsupported) {
-                throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+                throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
             }
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 5096360..acee3d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 
 import java.nio.ByteBuffer;
@@ -33,7 +33,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class DescribeAclsRequest extends AbstractRequest {
@@ -46,15 +46,15 @@ public class DescribeAclsRequest extends AbstractRequest {
             PERMISSION_TYPE);
 
     /**
-     * V1 sees a new `RESOURCE_NAME_TYPE_FILTER` that controls how the filter handles different resource name types.
-     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
+     * V1 sees a new `RESOURCE_PATTERN_TYPE_FILTER` that controls how the filter handles different resource pattern types.
+     * For more info, see {@link PatternType}.
      *
-     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
+     * Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
      */
     private static final Schema DESCRIBE_ACLS_REQUEST_V1 = new Schema(
             RESOURCE_TYPE,
             RESOURCE_NAME_FILTER,
-            RESOURCE_NAME_TYPE_FILTER,
+            RESOURCE_PATTERN_TYPE_FILTER,
             PRINCIPAL_FILTER,
             HOST_FILTER,
             OPERATION,
@@ -130,8 +130,8 @@ public class DescribeAclsRequest extends AbstractRequest {
     }
 
     private void validate(AclBindingFilter filter, short version) {
-        if (version == 0 && filter.patternFilter().nameType() != ResourceNameType.LITERAL) {
-            throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+        if (version == 0 && filter.patternFilter().patternType() != PatternType.LITERAL) {
+            throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
         }
 
         if (filter.isUnknown()) {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index 66f2895..341845c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -27,7 +28,6 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceNameType;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -43,7 +43,7 @@ import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 
@@ -61,14 +61,14 @@ public class DescribeAclsResponse extends AbstractResponse {
                     PERMISSION_TYPE))));
 
     /**
-     * V1 sees a new `RESOURCE_NAME_TYPE` that describes how the resource name is interpreted.
+     * V1 sees a new `RESOURCE_PATTERN_TYPE` that defines the type of the resource pattern.
      *
-     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
+     * For more info, see {@link PatternType}.
      */
     private static final Schema DESCRIBE_ACLS_RESOURCE_V1 = new Schema(
             RESOURCE_TYPE,
             RESOURCE_NAME,
-            RESOURCE_NAME_TYPE,
+            RESOURCE_PATTERN_TYPE,
             new Field(ACLS_KEY_NAME, new ArrayOf(new Schema(
                     PRINCIPAL,
                     HOST,
@@ -82,10 +82,10 @@ public class DescribeAclsResponse extends AbstractResponse {
             new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_ACLS_RESOURCE_V0), "The resources and their associated ACLs."));
 
     /**
-     * V1 sees a new `RESOURCE_NAME_TYPE` field added to DESCRIBE_ACLS_RESOURCE_V1, that describes how the resource name is interpreted
+     * V1 sees a new `RESOURCE_PATTERN_TYPE` field added to DESCRIBE_ACLS_RESOURCE_V1, that describes how the resource name is interpreted
      * and version was bumped to indicate that, on quota violation, brokers send out responses before throttling.
      *
-     * For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
+     * For more info, see {@link PatternType}.
      */
     private static final Schema DESCRIBE_ACLS_RESPONSE_V1 = new Schema(
             THROTTLE_TIME_MS,
@@ -186,10 +186,10 @@ public class DescribeAclsResponse extends AbstractResponse {
         if (version == 0) {
             final boolean unsupported = acls.stream()
                 .map(AclBinding::pattern)
-                .map(ResourcePattern::nameType)
-                .anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
+                .map(ResourcePattern::patternType)
+                .anyMatch(patternType -> patternType != PatternType.LITERAL);
             if (unsupported) {
-                throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
+                throw new UnsupportedVersionException("Version 0 only supports literal resource pattern types");
             }
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index 275fb16..7638c6c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -20,10 +20,10 @@ import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.resource.ResourceType;
 
 import static org.apache.kafka.common.protocol.CommonFields.HOST;
@@ -34,8 +34,8 @@ import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 final class RequestUtils {
@@ -45,29 +45,29 @@ final class RequestUtils {
     static ResourcePattern resourcePatternromStructFields(Struct struct) {
         byte resourceType = struct.get(RESOURCE_TYPE);
         String name = struct.get(RESOURCE_NAME);
-        ResourceNameType resourceNameType = ResourceNameType.fromCode(
-            struct.getOrElse(RESOURCE_NAME_TYPE, ResourceNameType.LITERAL.code()));
-        return new ResourcePattern(ResourceType.fromCode(resourceType), name, resourceNameType);
+        PatternType patternType = PatternType.fromCode(
+            struct.getOrElse(RESOURCE_PATTERN_TYPE, PatternType.LITERAL.code()));
+        return new ResourcePattern(ResourceType.fromCode(resourceType), name, patternType);
     }
 
     static void resourcePatternSetStructFields(ResourcePattern pattern, Struct struct) {
         struct.set(RESOURCE_TYPE, pattern.resourceType().code());
         struct.set(RESOURCE_NAME, pattern.name());
-        struct.setIfExists(RESOURCE_NAME_TYPE, pattern.nameType().code());
+        struct.setIfExists(RESOURCE_PATTERN_TYPE, pattern.patternType().code());
     }
 
     static ResourcePatternFilter resourcePatternFilterFromStructFields(Struct struct) {
         byte resourceType = struct.get(RESOURCE_TYPE);
         String name = struct.get(RESOURCE_NAME_FILTER);
-        ResourceNameType resourceNameType = ResourceNameType.fromCode(
-            struct.getOrElse(RESOURCE_NAME_TYPE_FILTER, ResourceNameType.LITERAL.code()));
-        return new ResourcePatternFilter(ResourceType.fromCode(resourceType), name, resourceNameType);
+        PatternType patternType = PatternType.fromCode(
+            struct.getOrElse(RESOURCE_PATTERN_TYPE_FILTER, PatternType.LITERAL.code()));
+        return new ResourcePatternFilter(ResourceType.fromCode(resourceType), name, patternType);
     }
 
     static void resourcePatternFilterSetStructFields(ResourcePatternFilter patternFilter, Struct struct) {
         struct.set(RESOURCE_TYPE, patternFilter.resourceType().code());
         struct.set(RESOURCE_NAME_FILTER, patternFilter.name());
-        struct.setIfExists(RESOURCE_NAME_TYPE_FILTER, patternFilter.nameType().code());
+        struct.setIfExists(RESOURCE_PATTERN_TYPE_FILTER, patternFilter.patternType().code());
     }
 
     static AccessControlEntry aceFromStructFields(Struct struct) {
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java b/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
similarity index 51%
rename from clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java
rename to clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
index 0e4fc0f..1233959 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
@@ -26,51 +26,64 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
- * Resource name type.
+ * Resource pattern type.
  */
 @InterfaceStability.Evolving
-public enum ResourceNameType {
+public enum PatternType {
     /**
-     * Represents any ResourceNameType which this client cannot understand, perhaps because this client is too old.
+     * Represents any PatternType which this client cannot understand, perhaps because this client is too old.
      */
     UNKNOWN((byte) 0),
 
     /**
-     * In a filter, matches any resource name type.
+     * In a filter, matches any resource pattern type.
      */
     ANY((byte) 1),
 
     /**
+     * In a filter, will perform pattern matching.
+     *
+     * e.g. Given a filter of {@code ResourcePatternFilter(TOPIC, "payments.received", MATCH)`}, the filter match
+     * any {@link ResourcePattern} that matches topic 'payments.received'. This might include:
+     * <ul>
+     *     <li>A Literal pattern with the same type and name, e.g. {@code ResourcePattern(TOPIC, "payments.received", LITERAL)}</li>
+     *     <li>A Wildcard pattern with the same type, e.g. {@code ResourcePattern(TOPIC, "*", LITERAL)}</li>
+     *     <li>A Prefixed pattern with the same type and where the name is a matching prefix, e.g. {@code ResourcePattern(TOPIC, "payments.", PREFIXED)}</li>
+     * </ul>
+     */
+    MATCH((byte) 2),
+
+    /**
      * A literal resource name.
      *
      * A literal name defines the full name of a resource, e.g. topic with name 'foo', or group with name 'bob'.
      *
      * The special wildcard character {@code *} can be used to represent a resource with any name.
      */
-    LITERAL((byte) 2),
+    LITERAL((byte) 3),
 
     /**
      * A prefixed resource name.
      *
      * A prefixed name defines a prefix for a resource, e.g. topics with names that start with 'foo'.
      */
-    PREFIXED((byte) 3);
+    PREFIXED((byte) 4);
 
-    private final static Map<Byte, ResourceNameType> CODE_TO_VALUE =
+    private final static Map<Byte, PatternType> CODE_TO_VALUE =
         Collections.unmodifiableMap(
-            Arrays.stream(ResourceNameType.values())
-                .collect(Collectors.toMap(ResourceNameType::code, Function.identity()))
+            Arrays.stream(PatternType.values())
+                .collect(Collectors.toMap(PatternType::code, Function.identity()))
         );
 
-    private final static Map<String, ResourceNameType> NAME_TO_VALUE =
+    private final static Map<String, PatternType> NAME_TO_VALUE =
         Collections.unmodifiableMap(
-            Arrays.stream(ResourceNameType.values())
-                .collect(Collectors.toMap(ResourceNameType::name, Function.identity()))
+            Arrays.stream(PatternType.values())
+                .collect(Collectors.toMap(PatternType::name, Function.identity()))
         );
 
     private final byte code;
 
-    ResourceNameType(byte code) {
+    PatternType(byte code) {
         this.code = code;
     }
 
@@ -82,23 +95,30 @@ public enum ResourceNameType {
     }
 
     /**
-     * Return whether this resource name type is UNKNOWN.
+     * @eturn whether this resource pattern type is UNKNOWN.
      */
     public boolean isUnknown() {
         return this == UNKNOWN;
     }
 
     /**
-     * Return the ResourceNameType with the provided code or {@link #UNKNOWN} if one cannot be found.
+     * @return whether this resource pattern type is a concrete type, rather than UNKNOWN or one of the filter types.
+     */
+    public boolean isSpecific() {
+        return this != UNKNOWN && this != ANY && this != MATCH;
+    }
+
+    /**
+     * Return the PatternType with the provided code or {@link #UNKNOWN} if one cannot be found.
      */
-    public static ResourceNameType fromCode(byte code) {
+    public static PatternType fromCode(byte code) {
         return CODE_TO_VALUE.getOrDefault(code, UNKNOWN);
     }
 
     /**
-     * Return the ResourceNameType with the provided name or {@link #UNKNOWN} if one cannot be found.
+     * Return the PatternType with the provided name or {@link #UNKNOWN} if one cannot be found.
      */
-    public static ResourceNameType fromString(String name) {
+    public static PatternType fromString(String name) {
         return NAME_TO_VALUE.getOrDefault(name, UNKNOWN);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
index c6aee91..2b7504f 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePattern.java
@@ -36,26 +36,26 @@ public class ResourcePattern {
 
     private final ResourceType resourceType;
     private final String name;
-    private final ResourceNameType nameType;
+    private final PatternType patternType;
 
    /**
      * Create a pattern using the supplied parameters.
      *
      * @param resourceType non-null, specific, resource type
      * @param name non-null resource name, which can be the {@link #WILDCARD_RESOURCE}.
-     * @param nameType non-null, specific, resource name type, which controls how the pattern will match resource names.
+     * @param patternType non-null, specific, resource pattern type, which controls how the pattern will match resource names.
      */
-    public ResourcePattern(ResourceType resourceType, String name, ResourceNameType nameType) {
+    public ResourcePattern(ResourceType resourceType, String name, PatternType patternType) {
         this.resourceType = Objects.requireNonNull(resourceType, "resourceType");
         this.name = Objects.requireNonNull(name, "name");
-        this.nameType = Objects.requireNonNull(nameType, "nameType");
+        this.patternType = Objects.requireNonNull(patternType, "patternType");
 
         if (resourceType == ResourceType.ANY) {
             throw new IllegalArgumentException("resourceType must not be ANY");
         }
 
-        if (nameType == ResourceNameType.ANY) {
-            throw new IllegalArgumentException("nameType must not be ANY");
+        if (patternType == PatternType.MATCH || patternType == PatternType.ANY) {
+            throw new IllegalArgumentException("patternType must not be " + patternType);
         }
     }
 
@@ -74,29 +74,29 @@ public class ResourcePattern {
     }
 
     /**
-     * @return the resource name type.
+     * @return the resource pattern type.
      */
-    public ResourceNameType nameType() {
-        return nameType;
+    public PatternType patternType() {
+        return patternType;
     }
 
     /**
      * @return a filter which matches only this pattern.
      */
     public ResourcePatternFilter toFilter() {
-        return new ResourcePatternFilter(resourceType, name, nameType);
+        return new ResourcePatternFilter(resourceType, name, patternType);
     }
 
     @Override
     public String toString() {
-        return "ResourcePattern(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", nameType=" + nameType + ")";
+        return "ResourcePattern(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", patternType=" + patternType + ")";
     }
 
     /**
      * @return {@code true} if this Resource has any UNKNOWN components.
      */
     public boolean isUnknown() {
-        return resourceType.isUnknown() || nameType.isUnknown();
+        return resourceType.isUnknown() || patternType.isUnknown();
     }
 
     @Override
@@ -109,11 +109,11 @@ public class ResourcePattern {
         final ResourcePattern resource = (ResourcePattern) o;
         return resourceType == resource.resourceType &&
             Objects.equals(name, resource.name) &&
-            nameType == resource.nameType;
+            patternType == resource.patternType;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(resourceType, name, nameType);
+        return Objects.hash(resourceType, name, patternType);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePatternFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePatternFilter.java
index 8b4fdc0..83f5c88 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourcePatternFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourcePatternFilter.java
@@ -33,11 +33,11 @@ public class ResourcePatternFilter {
     /**
      * Matches any resource pattern.
      */
-    public static final ResourcePatternFilter ANY = new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.ANY);
+    public static final ResourcePatternFilter ANY = new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY);
 
     private final ResourceType resourceType;
     private final String name;
-    private final ResourceNameType nameType;
+    private final PatternType patternType;
 
     /**
      * Create a filter using the supplied parameters.
@@ -48,22 +48,23 @@ public class ResourcePatternFilter {
      * @param name         resource name or {@code null}.
      *                     If {@code null}, the filter will ignore the name of resources.
      *                     If {@link ResourcePattern#WILDCARD_RESOURCE}, will match only wildcard patterns.
-     * @param nameType     non-null resource name type.
-     *                     If {@link ResourceNameType#ANY}, the filter will match patterns that would match any
-     *                     {@code ResourceNameType}, and also any wildcards patterns.
-     *                     If any other resource name type, the filter will match only patterns with the same type.
+     * @param patternType  non-null resource pattern type.
+     *                     If {@link PatternType#ANY}, the filter will match patterns regardless of pattern type.
+     *                     If {@link PatternType#MATCH}, the filter will match patterns that would match the supplied
+     *                     {@code name}, including a matching prefixed and wildcards patterns.
+     *                     If any other resource pattern type, the filter will match only patterns with the same type.
      */
-    public ResourcePatternFilter(ResourceType resourceType, String name, ResourceNameType nameType) {
+    public ResourcePatternFilter(ResourceType resourceType, String name, PatternType patternType) {
         this.resourceType = Objects.requireNonNull(resourceType, "resourceType");
         this.name = name;
-        this.nameType = Objects.requireNonNull(nameType, "nameType");
+        this.patternType = Objects.requireNonNull(patternType, "patternType");
     }
 
     /**
      * @return {@code true} if this filter has any UNKNOWN components.
      */
     public boolean isUnknown() {
-        return resourceType.isUnknown() || nameType.isUnknown();
+        return resourceType.isUnknown() || patternType.isUnknown();
     }
 
     /**
@@ -81,10 +82,10 @@ public class ResourcePatternFilter {
     }
 
     /**
-     * @return the resource name type.
+     * @return the resource pattern type.
      */
-    public ResourceNameType nameType() {
-        return nameType;
+    public PatternType patternType() {
+        return patternType;
     }
 
     /**
@@ -95,7 +96,7 @@ public class ResourcePatternFilter {
             return false;
         }
 
-        if (!nameType.equals(ResourceNameType.ANY) && !nameType.equals(pattern.nameType())) {
+        if (!patternType.equals(PatternType.ANY) && !patternType.equals(PatternType.MATCH) && !patternType.equals(pattern.patternType())) {
             return false;
         }
 
@@ -103,11 +104,11 @@ public class ResourcePatternFilter {
             return true;
         }
 
-        if (nameType.equals(pattern.nameType())) {
+        if (patternType.equals(PatternType.ANY) || patternType.equals(pattern.patternType())) {
             return name.equals(pattern.name());
         }
 
-        switch (pattern.nameType()) {
+        switch (pattern.patternType()) {
             case LITERAL:
                 return name.equals(pattern.name()) || pattern.name().equals(WILDCARD_RESOURCE);
 
@@ -115,7 +116,7 @@ public class ResourcePatternFilter {
                 return name.startsWith(pattern.name());
 
             default:
-                throw new IllegalArgumentException("Unsupported ResourceNameType: " + pattern.nameType());
+                throw new IllegalArgumentException("Unsupported PatternType: " + pattern.patternType());
         }
     }
 
@@ -137,16 +138,16 @@ public class ResourcePatternFilter {
             return "Resource type is UNKNOWN.";
         if (name == null)
             return "Resource name is NULL.";
-        if (nameType == ResourceNameType.ANY)
-            return "Resource name type is ANY.";
-        if (nameType == ResourceNameType.UNKNOWN)
-            return "Resource name type is UNKNOWN.";
+        if (patternType == PatternType.MATCH)
+            return "Resource pattern type is ANY.";
+        if (patternType == PatternType.UNKNOWN)
+            return "Resource pattern type is UNKNOWN.";
         return null;
     }
 
     @Override
     public String toString() {
-        return "ResourcePattern(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", nameType=" + nameType + ")";
+        return "ResourcePattern(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ", patternType=" + patternType + ")";
     }
 
     @Override
@@ -159,11 +160,11 @@ public class ResourcePatternFilter {
         final ResourcePatternFilter resource = (ResourcePatternFilter) o;
         return resourceType == resource.resourceType &&
             Objects.equals(name, resource.name) &&
-            nameType == resource.nameType;
+            patternType == resource.patternType;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(resourceType, name, nameType);
+        return Objects.hash(resourceType, name, patternType);
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d2f9887..3566f83 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -65,7 +65,7 @@ import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
@@ -535,13 +535,13 @@ public class KafkaAdminClientTest {
         }
     }
 
-    private static final AclBinding ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
+    private static final AclBinding ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
-    private static final AclBinding ACL2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic4", ResourceNameType.LITERAL),
+    private static final AclBinding ACL2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic4", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY));
-    private static final AclBindingFilter FILTER1 = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.LITERAL),
+    private static final AclBindingFilter FILTER1 = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.LITERAL),
         new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY));
-    private static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.LITERAL),
+    private static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, null, PatternType.LITERAL),
         new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY));
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
index 4e41f98..461661f 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.acl;
 
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
@@ -30,19 +30,19 @@ import static org.junit.Assert.assertTrue;
 
 public class AclBindingTest {
     private static final AclBinding ACL1 = new AclBinding(
-        new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
+        new ResourcePattern(ResourceType.TOPIC, "mytopic", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
 
     private static final AclBinding ACL2 = new AclBinding(
-        new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
+        new ResourcePattern(ResourceType.TOPIC, "mytopic", PatternType.LITERAL),
         new AccessControlEntry("User:*", "", AclOperation.READ, AclPermissionType.ALLOW));
 
     private static final AclBinding ACL3 = new AclBinding(
-        new ResourcePattern(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL),
+        new ResourcePattern(ResourceType.TOPIC, "mytopic2", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
     private static final AclBinding UNKNOWN_ACL = new AclBinding(
-        new ResourcePattern(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL),
+        new ResourcePattern(ResourceType.TOPIC, "mytopic2", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.UNKNOWN, AclPermissionType.DENY));
 
     private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter(
@@ -54,14 +54,14 @@ public class AclBindingTest {
         new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.DENY));
 
     private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter(
-        new ResourcePatternFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
+        new ResourcePatternFilter(ResourceType.TOPIC, "mytopic", PatternType.LITERAL),
         new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
 
     @Test
     public void testMatching() {
         assertEquals(ACL1, ACL1);
         final AclBinding acl1Copy = new AclBinding(
-            new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
+            new ResourcePattern(ResourceType.TOPIC, "mytopic", PatternType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
         assertEquals(ACL1, acl1Copy);
         assertEquals(acl1Copy, ACL1);
@@ -115,22 +115,27 @@ public class AclBindingTest {
     }
 
     @Test
-    public void shouldNotThrowOnUnknownResourceNameType() {
-        new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.UNKNOWN), ACL1.entry());
+    public void shouldNotThrowOnUnknownPatternType() {
+        new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.UNKNOWN), ACL1.entry());
     }
 
     @Test
     public void shouldNotThrowOnUnknownResourceType() {
-        new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "foo", ResourceNameType.LITERAL), ACL1.entry());
+        new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "foo", PatternType.LITERAL), ACL1.entry());
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowOnAnyResourceNameType() {
-        new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.ANY), ACL1.entry());
+    public void shouldThrowOnMatchPatternType() {
+        new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.MATCH), ACL1.entry());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowOnAnyPatternType() {
+        new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.ANY), ACL1.entry());
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowOnAnyResourceType() {
-        new AclBinding(new ResourcePattern(ResourceType.ANY, "foo", ResourceNameType.LITERAL), ACL1.entry());
+        new AclBinding(new ResourcePattern(ResourceType.ANY, "foo", PatternType.LITERAL), ACL1.entry());
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternFilterTest.java b/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternFilterTest.java
index 87b25fc..08d5a63 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternFilterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternFilterTest.java
@@ -17,13 +17,13 @@
 
 package org.apache.kafka.common.acl;
 
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.junit.Test;
 
-import static org.apache.kafka.common.resource.ResourceNameType.LITERAL;
-import static org.apache.kafka.common.resource.ResourceNameType.PREFIXED;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
 import static org.apache.kafka.common.resource.ResourceType.ANY;
 import static org.apache.kafka.common.resource.ResourceType.GROUP;
 import static org.apache.kafka.common.resource.ResourceType.TOPIC;
@@ -32,26 +32,14 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class ResourcePatternFilterTest {
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowIfResourceTypeIsAny() {
-        new ResourcePatternFilter(ANY, null, ResourceNameType.ANY)
-            .matches(new ResourcePattern(ANY, "Name", PREFIXED));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowIfResourceNameTypeIsAny() {
-        new ResourcePatternFilter(ANY, null, ResourceNameType.ANY)
-            .matches(new ResourcePattern(GROUP, "Name", ResourceNameType.ANY));
-    }
-
     @Test
     public void shouldBeUnknownIfResourceTypeUnknown() {
-        assertTrue(new ResourcePatternFilter(UNKNOWN, null, ResourceNameType.LITERAL).isUnknown());
+        assertTrue(new ResourcePatternFilter(UNKNOWN, null, PatternType.LITERAL).isUnknown());
     }
 
     @Test
-    public void shouldBeUnknownIfResourceNameTypeUnknown() {
-        assertTrue(new ResourcePatternFilter(GROUP, null, ResourceNameType.UNKNOWN).isUnknown());
+    public void shouldBeUnknownIfPatternTypeUnknown() {
+        assertTrue(new ResourcePatternFilter(GROUP, null, PatternType.UNKNOWN).isUnknown());
     }
 
     @Test
@@ -73,7 +61,7 @@ public class ResourcePatternFilterTest {
     }
 
     @Test
-    public void shouldNotMatchIfDifferentNameType() {
+    public void shouldNotMatchIfDifferentPatternType() {
         assertFalse(new ResourcePatternFilter(TOPIC, "Name", LITERAL)
             .matches(new ResourcePattern(TOPIC, "Name", PREFIXED)));
     }
@@ -91,8 +79,14 @@ public class ResourcePatternFilterTest {
     }
 
     @Test
-    public void shouldMatchWhereResourceNameTypeIsAny() {
-        assertTrue(new ResourcePatternFilter(TOPIC, null, ResourceNameType.ANY)
+    public void shouldMatchWherePatternTypeIsAny() {
+        assertTrue(new ResourcePatternFilter(TOPIC, null, PatternType.ANY)
+            .matches(new ResourcePattern(TOPIC, "Name", PREFIXED)));
+    }
+
+    @Test
+    public void shouldMatchWherePatternTypeIsMatch() {
+        assertTrue(new ResourcePatternFilter(TOPIC, null, PatternType.MATCH)
             .matches(new ResourcePattern(TOPIC, "Name", PREFIXED)));
     }
 
@@ -103,14 +97,20 @@ public class ResourcePatternFilterTest {
     }
 
     @Test
-    public void shouldMatchLiteralIfNameMatchesAndFilterIsOnAnyNameType() {
-        assertTrue(new ResourcePatternFilter(TOPIC, "Name", ResourceNameType.ANY)
+    public void shouldMatchLiteralIfNameMatchesAndFilterIsOnPatternTypeAny() {
+        assertTrue(new ResourcePatternFilter(TOPIC, "Name", PatternType.ANY)
+            .matches(new ResourcePattern(TOPIC, "Name", LITERAL)));
+    }
+
+    @Test
+    public void shouldMatchLiteralIfNameMatchesAndFilterIsOnPatternTypeMatch() {
+        assertTrue(new ResourcePatternFilter(TOPIC, "Name", PatternType.MATCH)
             .matches(new ResourcePattern(TOPIC, "Name", LITERAL)));
     }
 
     @Test
     public void shouldNotMatchLiteralIfNamePrefixed() {
-        assertFalse(new ResourcePatternFilter(TOPIC, "Name-something", ResourceNameType.ANY)
+        assertFalse(new ResourcePatternFilter(TOPIC, "Name-something", PatternType.MATCH)
             .matches(new ResourcePattern(TOPIC, "Name", LITERAL)));
     }
 
@@ -133,8 +133,14 @@ public class ResourcePatternFilterTest {
     }
 
     @Test
-    public void shouldMatchLiteralWildcardIfFilterHasNameTypeOfAny() {
-        assertTrue(new ResourcePatternFilter(TOPIC, "Name", ResourceNameType.ANY)
+    public void shouldNotMatchLiteralWildcardIfFilterHasPatternTypeOfAny() {
+        assertFalse(new ResourcePatternFilter(TOPIC, "Name", PatternType.ANY)
+            .matches(new ResourcePattern(TOPIC, "*", LITERAL)));
+    }
+
+    @Test
+    public void shouldMatchLiteralWildcardIfFilterHasPatternTypeOfMatch() {
+        assertTrue(new ResourcePatternFilter(TOPIC, "Name", PatternType.MATCH)
             .matches(new ResourcePattern(TOPIC, "*", LITERAL)));
     }
 
@@ -157,8 +163,14 @@ public class ResourcePatternFilterTest {
     }
 
     @Test
-    public void shouldMatchPrefixedIfNamePrefixedAnyFilterTypeIsAny() {
-        assertTrue(new ResourcePatternFilter(TOPIC, "Name-something", ResourceNameType.ANY)
+    public void shouldNotMatchPrefixedIfNamePrefixedAnyFilterTypeIsAny() {
+        assertFalse(new ResourcePatternFilter(TOPIC, "Name-something", PatternType.ANY)
+            .matches(new ResourcePattern(TOPIC, "Name", PREFIXED)));
+    }
+
+    @Test
+    public void shouldMatchPrefixedIfNamePrefixedAnyFilterTypeIsMatch() {
+        assertTrue(new ResourcePatternFilter(TOPIC, "Name-something", PatternType.MATCH)
             .matches(new ResourcePattern(TOPIC, "Name", PREFIXED)));
     }
 }
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternTest.java b/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternTest.java
index d76e213..d3538e0 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/ResourcePatternTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.common.acl;
 
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
@@ -25,16 +25,21 @@ import org.junit.Test;
 public class ResourcePatternTest {
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfResourceTypeIsAny() {
-        new ResourcePattern(ResourceType.ANY, "name", ResourceNameType.LITERAL);
+        new ResourcePattern(ResourceType.ANY, "name", PatternType.LITERAL);
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowIfResourceNameTypeIsAny() {
-        new ResourcePattern(ResourceType.TOPIC, "name", ResourceNameType.ANY);
+    public void shouldThrowIfPatternTypeIsMatch() {
+        new ResourcePattern(ResourceType.TOPIC, "name", PatternType.MATCH);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfPatternTypeIsAny() {
+        new ResourcePattern(ResourceType.TOPIC, "name", PatternType.ANY);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfResourceNameIsNull() {
-        new ResourcePattern(ResourceType.TOPIC, null, ResourceNameType.ANY);
+        new ResourcePattern(ResourceType.TOPIC, null, PatternType.ANY);
     }
 }
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
index 1f3c15c..5642677 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
@@ -39,16 +39,16 @@ public class CreateAclsRequestTest {
     private static final short V0 = 0;
     private static final short V1 = 1;
 
-    private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+    private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
-    private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", ResourceNameType.LITERAL),
+    private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", PatternType.LITERAL),
         new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
 
-    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
         new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    private static final AclBinding UNKNOWN_ACL1 = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "unknown", ResourceNameType.LITERAL),
+    private static final AclBinding UNKNOWN_ACL1 = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "unknown", PatternType.LITERAL),
         new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
     @Test(expected = UnsupportedVersionException.class)
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
index 42b7dcc..4311813 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
@@ -37,16 +37,16 @@ public class DeleteAclsRequestTest {
     private static final short V0 = 0;
     private static final short V1 = 1;
 
-    private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+    private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foo", PatternType.LITERAL),
         new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
-    private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+    private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "prefix", ResourceNameType.PREFIXED),
+    private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "prefix", PatternType.PREFIXED),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
     @Test(expected = UnsupportedVersionException.class)
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
index b6d9f92..f8bec15 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
@@ -21,12 +21,12 @@ import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
-import org.apache.kafka.common.resource.ResourceNameType;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
@@ -41,16 +41,16 @@ public class DeleteAclsResponseTest {
     private static final short V0 = 0;
     private static final short V1 = 1;
 
-    private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+    private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
-    private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", ResourceNameType.LITERAL),
+    private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", PatternType.LITERAL),
         new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
 
-    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
         new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    private static final AclBinding UNKNOWN_ACL = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "group", ResourceNameType.LITERAL),
+    private static final AclBinding UNKNOWN_ACL = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "group", PatternType.LITERAL),
         new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
 
     private static final AclFilterResponse LITERAL_RESPONSE = new AclFilterResponse(aclDeletions(LITERAL_ACL1, LITERAL_ACL2));
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
index 3e4e531..d7c6593 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
@@ -34,16 +34,16 @@ public class DescribeAclsRequestTest {
     private static final short V0 = 0;
     private static final short V1 = 1;
 
-    private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+    private static final AclBindingFilter LITERAL_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foo", PatternType.LITERAL),
         new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
-    private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+    private static final AclBindingFilter PREFIXED_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+    private static final AclBindingFilter ANY_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
         new AccessControlEntryFilter("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "foo", ResourceNameType.LITERAL),
+    private static final AclBindingFilter UNKNOWN_FILTER = new AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "foo", PatternType.LITERAL),
         new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
     @Test(expected = UnsupportedVersionException.class)
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
index 2d3ac8f..13a3ebb 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
@@ -39,16 +39,16 @@ public class DescribeAclsResponseTest {
     private static final short V0 = 0;
     private static final short V1 = 1;
 
-    private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.LITERAL),
+    private static final AclBinding LITERAL_ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
-    private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", ResourceNameType.LITERAL),
+    private static final AclBinding LITERAL_ACL2 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "group", PatternType.LITERAL),
         new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW));
 
-    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", ResourceNameType.PREFIXED),
+    private static final AclBinding PREFIXED_ACL1 = new AclBinding(new ResourcePattern(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
         new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE, AclPermissionType.ALLOW));
 
-    private static final AclBinding UNKNOWN_ACL = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "foo", ResourceNameType.LITERAL),
+    private static final AclBinding UNKNOWN_ACL = new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "foo", PatternType.LITERAL),
         new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY));
 
     @Test(expected = UnsupportedVersionException.class)
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e09cc9b..6e705d2 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
-import org.apache.kafka.common.resource.ResourceNameType;
+import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
@@ -1088,23 +1088,23 @@ public class RequestResponseTest {
 
     private DescribeAclsRequest createListAclsRequest() {
         return new DescribeAclsRequest.Builder(new AclBindingFilter(
-                new ResourcePatternFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
+                new ResourcePatternFilter(ResourceType.TOPIC, "mytopic", PatternType.LITERAL),
                 new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).build();
     }
 
     private DescribeAclsResponse createDescribeAclsResponse() {
         return new DescribeAclsResponse(0, ApiError.NONE, Collections.singleton(new AclBinding(
-            new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
+            new ResourcePattern(ResourceType.TOPIC, "mytopic", PatternType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))));
     }
 
     private CreateAclsRequest createCreateAclsRequest() {
         List<AclCreation> creations = new ArrayList<>();
         creations.add(new AclCreation(new AclBinding(
-            new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
+            new ResourcePattern(ResourceType.TOPIC, "mytopic", PatternType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.ALLOW))));
         creations.add(new AclCreation(new AclBinding(
-            new ResourcePattern(ResourceType.GROUP, "mygroup", ResourceNameType.LITERAL),
+            new ResourcePattern(ResourceType.GROUP, "mygroup", PatternType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))));
         return new CreateAclsRequest.Builder(creations).build();
     }
@@ -1117,10 +1117,10 @@ public class RequestResponseTest {
     private DeleteAclsRequest createDeleteAclsRequest() {
         List<AclBindingFilter> filters = new ArrayList<>();
         filters.add(new AclBindingFilter(
-            new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.LITERAL),
+            new ResourcePatternFilter(ResourceType.ANY, null, PatternType.LITERAL),
             new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)));
         filters.add(new AclBindingFilter(
-            new ResourcePatternFilter(ResourceType.ANY, null, ResourceNameType.LITERAL),
+            new ResourcePatternFilter(ResourceType.ANY, null, PatternType.LITERAL),
             new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY)));
         return new DeleteAclsRequest.Builder(filters).build();
     }
@@ -1129,10 +1129,10 @@ public class RequestResponseTest {
         List<AclFilterResponse> responses = new ArrayList<>();
         responses.add(new AclFilterResponse(Utils.mkSet(
                 new AclDeletionResult(new AclBinding(
-                        new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
+                        new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
                         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))),
                 new AclDeletionResult(new AclBinding(
-                        new ResourcePattern(ResourceType.TOPIC, "mytopic4", ResourceNameType.LITERAL),
+                        new ResourcePattern(ResourceType.TOPIC, "mytopic4", PatternType.LITERAL),
                         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY))))));
         responses.add(new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"),
             Collections.<AclDeletionResult>emptySet()));
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index d223945..e86e1a3 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -25,13 +25,13 @@ import kafka.utils._
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.resource.{ResourcePatternFilter, ResourceNameType, ResourceType => JResourceType, Resource => JResource}
+import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
 
 import scala.collection.JavaConverters._
 
 object AclCommand extends Logging {
 
-  val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, ResourceNameType.LITERAL)
+  val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL)
 
   private val Newline = scala.util.Properties.lineSeparator
 
@@ -87,13 +87,14 @@ object AclCommand extends Logging {
   }
 
   private def addAcl(opts: AclCommandOptions) {
-    if (opts.options.valueOf(opts.resourceNameType) == ResourceNameType.ANY)
-      CommandLineUtils.printUsageAndDie(opts.parser, "A '--resource-name-type' value of 'Any' is not valid when adding acls.")
+    val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType)
+    if (patternType == PatternType.MATCH || patternType == PatternType.ANY)
+      CommandLineUtils.printUsageAndDie(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.")
 
     withAuthorizer(opts) { authorizer =>
       val resourceToAcl = getResourceFilterToAcls(opts).map {
         case (filter, acls) =>
-          Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.nameType()) -> acls
+          Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.patternType()) -> acls
       }
 
       if (resourceToAcl.values.exists(_.isEmpty))
@@ -262,24 +263,24 @@ object AclCommand extends Logging {
   }
 
   private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = {
-    val resourceNameType: ResourceNameType = opts.options.valueOf(opts.resourceNameType)
+    val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType)
 
     var resourceFilters = Set.empty[ResourcePatternFilter]
     if (opts.options.has(opts.topicOpt))
-      opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resourceFilters += new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, resourceNameType))
+      opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resourceFilters += new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, patternType))
 
-    if (resourceNameType == ResourceNameType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
+    if (patternType == PatternType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
       resourceFilters += ClusterResourceFilter
 
     if (opts.options.has(opts.groupOpt))
-      opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resourceFilters += new ResourcePatternFilter(JResourceType.GROUP, group.trim, resourceNameType))
+      opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resourceFilters += new ResourcePatternFilter(JResourceType.GROUP, group.trim, patternType))
 
     if (opts.options.has(opts.transactionalIdOpt))
       opts.options.valuesOf(opts.transactionalIdOpt).asScala.foreach(transactionalId =>
-        resourceFilters += new ResourcePatternFilter(JResourceType.TRANSACTIONAL_ID, transactionalId, resourceNameType))
+        resourceFilters += new ResourcePatternFilter(JResourceType.TRANSACTIONAL_ID, transactionalId, patternType))
 
     if (opts.options.has(opts.delegationTokenOpt))
-      opts.options.valuesOf(opts.delegationTokenOpt).asScala.foreach(token => resourceFilters += new ResourcePatternFilter(JResourceType.DELEGATION_TOKEN, token.trim, resourceNameType))
+      opts.options.valuesOf(opts.delegationTokenOpt).asScala.foreach(token => resourceFilters += new ResourcePatternFilter(JResourceType.DELEGATION_TOKEN, token.trim, patternType))
 
     if (resourceFilters.isEmpty && dieIfNoResourceFound)
       CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>")
@@ -345,11 +346,16 @@ object AclCommand extends Logging {
       .describedAs("delegation-token")
       .ofType(classOf[String])
 
-    val resourceNameType = parser.accepts("resource-name-type", "The type of the resource name, or any.")
+    val resourcePatternType = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " +
+      "When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. " +
+      "When listing or removing acls, a specific pattern type can be used to list or remove acls from specific resource patterns, " +
+      "or use the filter values of 'any' or 'match', where 'any' will match any pattern type, but will match the resource name exactly, " +
+      "where as 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). " +
+      "WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.")
       .withRequiredArg()
       .ofType(classOf[String])
-      .withValuesConvertedBy(new ResourceNameTypeConverter())
-      .defaultsTo(ResourceNameType.LITERAL)
+      .withValuesConvertedBy(new PatternTypeConverter())
+      .defaultsTo(PatternType.LITERAL)
 
     val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
     val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
@@ -429,17 +435,17 @@ object AclCommand extends Logging {
 
 }
 
-class ResourceNameTypeConverter extends EnumConverter[ResourceNameType](classOf[ResourceNameType]) {
+class PatternTypeConverter extends EnumConverter[PatternType](classOf[PatternType]) {
 
-  override def convert(value: String): ResourceNameType = {
-    val nameType = super.convert(value)
-    if (nameType.isUnknown)
-      throw new ValueConversionException("Unknown resourceNameType: " + value)
+  override def convert(value: String): PatternType = {
+    val patternType = super.convert(value)
+    if (patternType.isUnknown)
+      throw new ValueConversionException("Unknown resource-pattern-type: " + value)
 
-    nameType
+    patternType
   }
 
-  override def valuePattern: String = ResourceNameType.values
-    .filter(_ != ResourceNameType.UNKNOWN)
+  override def valuePattern: String = PatternType.values
+    .filter(_ != PatternType.UNKNOWN)
     .mkString("|")
 }
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 8ec7f95..65c3506 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -16,6 +16,7 @@
  */
 package kafka.common
 
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.atomic.AtomicBoolean
 
@@ -102,7 +103,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
     val (data, _) = zkClient.getDataAndStat(changeZnode)
     data match {
       case Some(d) => Try(notificationHandler.processNotification(d)) match {
-        case Failure(e) => error(s"error processing change notification from $changeZnode", e)
+        case Failure(e) => error(s"error processing change notification ${new String(d, UTF_8)} from $changeZnode", e)
         case _ =>
       }
       case None => warn(s"read null data from $changeZnode")
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala
index 3d0f52e..74bd404 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -35,7 +35,7 @@ object SecurityUtils {
       principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
-      resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.nameType)
+      resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.patternType)
       acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
     } yield (resource, acl)) match {
       case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
@@ -44,7 +44,7 @@ object SecurityUtils {
   }
 
   def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
-    val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType)
+    val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType)
     val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
       acl.operation.toJava, acl.permissionType.toJava)
     new AclBinding(resourcePattern, entry)
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 6875dc6..9be8e6c 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -37,7 +37,7 @@ trait Authorizer extends Configurable {
   /**
    * @param session The session being authenticated.
    * @param operation Type of operation client is trying to perform on resource.
-   * @param resource Resource the client is trying to access. Resource name type is always literal in input resource.
+   * @param resource Resource the client is trying to access. Resource pattern type is always literal in input resource.
    * @return true if the operation should be permitted, false otherwise
    */
   def authorize(session: Session, operation: Operation, resource: Resource): Boolean
@@ -59,8 +59,8 @@ trait Authorizer extends Configurable {
    *
    * @param acls set of acls to add to existing acls
    * @param resource the resource path to which these acls should be attached.
-   *                the supplied resource will have a specific resource name type,
-   *                i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
+   *                the supplied resource will have a specific resource pattern type,
+   *                i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
    */
   def addAcls(acls: Set[Acl], resource: Resource): Unit
 
@@ -80,8 +80,8 @@ trait Authorizer extends Configurable {
    *
    * @param acls set of acls to be removed.
    * @param resource resource path from which the acls should be removed.
-   *                 the supplied resource will have a specific resource name type,
-   *                 i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
+   *                 the supplied resource will have a specific resource pattern type,
+   *                 i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
    * @return true if some acl got removed, false if no acl was removed.
    */
   def removeAcls(acls: Set[Acl], resource: Resource): Boolean
@@ -101,8 +101,8 @@ trait Authorizer extends Configurable {
    * {code}
    *
    * @param resource the resource path from which these acls should be removed.
-   *                 the supplied resource will have a specific resource name type,
-   *                 i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
+   *                 the supplied resource will have a specific resource pattern type,
+   *                 i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
    * @return
    */
   def removeAcls(resource: Resource): Boolean
@@ -122,8 +122,8 @@ trait Authorizer extends Configurable {
    * {code}
    *
    * @param resource the resource path to which the acls belong.
-   *                 the supplied resource will have a specific resource name type,
-   *                 i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
+   *                 the supplied resource will have a specific resource pattern type,
+   *                 i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
    * @return empty set if no acls are found, otherwise the acls for the resource.
    */
   def getAcls(resource: Resource): Set[Acl]
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index 78f0483..c475596 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -17,12 +17,12 @@
 package kafka.security.auth
 
 import kafka.common.KafkaException
-import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern}
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern}
 
 object Resource {
   val Separator = ":"
   val ClusterResourceName = "kafka-cluster"
-  val ClusterResource = Resource(Cluster, Resource.ClusterResourceName, ResourceNameType.LITERAL)
+  val ClusterResource = Resource(Cluster, Resource.ClusterResourceName, PatternType.LITERAL)
   val ProducerIdResourceName = "producer-id"
   val WildCardResource = "*"
 
@@ -32,13 +32,13 @@ object Resource {
       case Some(resourceType) =>
         val remaining = str.substring(resourceType.name.length + 1)
 
-        ResourceNameType.values.find(nameType => remaining.startsWith(nameType.name + Separator)) match {
-          case Some(nameType) =>
-            val name = remaining.substring(nameType.name.length + 1)
-            Resource(resourceType, name, nameType)
+        PatternType.values.find(patternType => remaining.startsWith(patternType.name + Separator)) match {
+          case Some(patternType) =>
+            val name = remaining.substring(patternType.name.length + 1)
+            Resource(resourceType, name, patternType)
 
           case None =>
-            Resource(resourceType, remaining, ResourceNameType.LITERAL)
+            Resource(resourceType, remaining, PatternType.LITERAL)
         }
     }
   }
@@ -49,35 +49,35 @@ object Resource {
  * @param resourceType non-null type of resource.
  * @param name non-null name of the resource, for topic this will be topic name , for group it will be group name. For cluster type
  *             it will be a constant string kafka-cluster.
- * @param nameType non-null type of resource name: literal, prefixed, etc.
+ * @param patternType non-null resource pattern type: literal, prefixed, etc.
  */
-case class Resource(resourceType: ResourceType, name: String, nameType: ResourceNameType) {
+case class Resource(resourceType: ResourceType, name: String, patternType: PatternType) {
 
-  if (nameType == ResourceNameType.ANY)
-    throw new IllegalArgumentException("nameType must not be ANY")
+  if (patternType == PatternType.MATCH || patternType == PatternType.ANY)
+    throw new IllegalArgumentException("patternType must not be " + patternType)
 
-  if (nameType == ResourceNameType.UNKNOWN)
-    throw new IllegalArgumentException("nameType must not be UNKNOWN")
+  if (patternType == PatternType.UNKNOWN)
+    throw new IllegalArgumentException("patternType must not be UNKNOWN")
 
   /**
     * Create an instance of this class with the provided parameters.
-    * Resource name type would default to ResourceNameType.LITERAL.
+    * Resource pattern type would default to PatternType.LITERAL.
     *
     * @param resourceType non-null resource type
     * @param name         non-null resource name
-    * @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, ResourceNameType)]]
+    * @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, PatternType)]]
     */
-  @deprecated("Use Resource(ResourceType, String, ResourceNameType", "Since 2.0")
+  @deprecated("Use Resource(ResourceType, String, PatternType", "Since 2.0")
   def this(resourceType: ResourceType, name: String) {
-    this(resourceType, name, ResourceNameType.LITERAL)
+    this(resourceType, name, PatternType.LITERAL)
   }
 
   def toPattern: ResourcePattern = {
-    new ResourcePattern(resourceType.toJava, name, nameType)
+    new ResourcePattern(resourceType.toJava, name, patternType)
   }
 
   override def toString: String = {
-    resourceType.name + Resource.Separator + nameType + Resource.Separator + name
+    resourceType.name + Resource.Separator + patternType + Resource.Separator + name
   }
 }
 
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index cecad0e..504d71a 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -28,7 +28,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.{AclChangeNotificationHandler, AclChangeSubscription, KafkaZkClient, ZkAclChangeStore, ZkAclStore}
 import org.apache.kafka.common.errors.UnsupportedVersionException
-import org.apache.kafka.common.resource.ResourceNameType
+import org.apache.kafka.common.resource.PatternType
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 
@@ -106,8 +106,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
-    if (resource.nameType != ResourceNameType.LITERAL) {
-      throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.nameType)
+    if (resource.patternType != PatternType.LITERAL) {
+      throw new IllegalArgumentException("Only literal resources are supported. Got: " + resource.patternType)
     }
 
     val principal = session.principal
@@ -165,7 +165,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   override def addAcls(acls: Set[Acl], resource: Resource) {
     if (acls != null && acls.nonEmpty) {
-      if (!extendedAclSupport && resource.nameType == ResourceNameType.PREFIXED) {
+      if (!extendedAclSupport && resource.patternType == PatternType.PREFIXED) {
         throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
           s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater")
       }
@@ -213,17 +213,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   def getMatchingAcls(resourceType: ResourceType, resourceName: String): Set[Acl] = {
     inReadLock(lock) {
-      val wildcard = aclCache.get(Resource(resourceType, Acl.WildCardResource, ResourceNameType.LITERAL))
+      val wildcard = aclCache.get(Resource(resourceType, Acl.WildCardResource, PatternType.LITERAL))
         .map(_.acls)
         .getOrElse(Set.empty[Acl])
 
-      val literal = aclCache.get(Resource(resourceType, resourceName, ResourceNameType.LITERAL))
+      val literal = aclCache.get(Resource(resourceType, resourceName, PatternType.LITERAL))
         .map(_.acls)
         .getOrElse(Set.empty[Acl])
 
       val prefixed = aclCache.range(
-        Resource(resourceType, resourceName, ResourceNameType.PREFIXED),
-        Resource(resourceType, resourceName.substring(0, 1), ResourceNameType.PREFIXED)
+        Resource(resourceType, resourceName, PatternType.PREFIXED),
+        Resource(resourceType, resourceName.substring(0, 1), PatternType.PREFIXED)
       )
         .filterKeys(resource => resourceName.startsWith(resource.name))
         .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
@@ -364,7 +364,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }
   }
 
-  // Orders by resource type, then resource name type and finally reverse ordering by name.
+  // Orders by resource type, then resource pattern type and finally reverse ordering by name.
   private object ResourceOrdering extends Ordering[Resource] {
 
     def compare(a: Resource, b: Resource): Int = {
@@ -372,7 +372,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       if (rt != 0)
         rt
       else {
-        val rnt = a.nameType compareTo b.nameType
+        val rnt = a.patternType compareTo b.patternType
         if (rnt != 0)
           rnt
         else
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 37a11bd..ebdf141 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -52,7 +52,7 @@ import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, A
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.resource.ResourceNameType.LITERAL
+import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -1923,7 +1923,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val filter = describeAclsRequest.filter()
         val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) =>
           acls.flatMap { acl =>
-            val fixture = new AclBinding(new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType),
+            val fixture = new AclBinding(new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType),
                 new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
             Some(fixture).filter(filter.matches)
           }
@@ -1996,7 +1996,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           val filtersWithIndex = filters.zipWithIndex
           for ((resource, acls) <- aclMap; acl <- acls) {
             val binding = new AclBinding(
-              new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType),
+              new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType),
               new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
                 acl.permissionType.toJava))
 
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index ad55a6f..d5beae8 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -31,7 +31,7 @@ import kafka.server.ConfigType
 import kafka.utils.Logging
 import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.resource.ResourceNameType
+import org.apache.kafka.common.resource.PatternType
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
@@ -1010,7 +1010,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
    * @param resource resource pattern that has changed
    */
   def createAclChangeNotification(resource: Resource): Unit = {
-    val aclChange = ZkAclStore(resource.nameType).changeStore.createChangeNode(resource)
+    val aclChange = ZkAclStore(resource.patternType).changeStore.createChangeNode(resource)
     val createRequest = CreateRequest(aclChange.path, aclChange.bytes, acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow
@@ -1064,22 +1064,22 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   }
 
   /**
-   * Gets the resource types, for which ACLs are stored, for the supplied resource name type.
-   * @param nameType The resource name type to retrieve the names for.
+   * Gets the resource types, for which ACLs are stored, for the supplied resource pattern type.
+   * @param patternType The resource pattern type to retrieve the names for.
    * @return list of resource type names
    */
-  def getResourceTypes(nameType: ResourceNameType): Seq[String] = {
-    getChildren(ZkAclStore(nameType).aclPath)
+  def getResourceTypes(patternType: PatternType): Seq[String] = {
+    getChildren(ZkAclStore(patternType).aclPath)
   }
 
   /**
-   * Gets the resource names, for which ACLs are stored, for a given resource type and name type
-   * @param nameType The resource name type to retrieve the names for.
+   * Gets the resource names, for which ACLs are stored, for a given resource type and pattern type
+   * @param patternType The resource pattern type to retrieve the names for.
    * @param resourceType Resource type to retrieve the names for.
    * @return list of resource names
    */
-  def getResourceNames(nameType: ResourceNameType, resourceType: ResourceType): Seq[String] = {
-    getChildren(ZkAclStore(nameType).path(resourceType))
+  def getResourceNames(patternType: PatternType, resourceType: ResourceType): Seq[String] = {
+    getChildren(ZkAclStore(patternType).path(resourceType))
   }
 
   /**
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 2cbdd80..d782ae0 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -33,7 +33,7 @@ import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.resource.ResourceNameType
+import org.apache.kafka.common.resource.PatternType
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.Time
@@ -451,7 +451,7 @@ object StateChangeHandlers {
 /**
   * Acls for resources are stored in ZK under two root paths:
   * <ul>
-  *   <li>[[org.apache.kafka.common.resource.ResourceNameType#LITERAL Literal]] patterns are stored under '/kafka-acl'.
+  *   <li>[[org.apache.kafka.common.resource.PatternType#LITERAL Literal]] patterns are stored under '/kafka-acl'.
   *   The format is JSON. See [[kafka.zk.ResourceZNode]] for details.</li>
   *   <li>All other patterns are stored under '/kafka-acl-extended/<i>pattern-type</i>'.
   *   The format is JSON. See [[kafka.zk.ResourceZNode]] for details.</li>
@@ -472,14 +472,14 @@ object StateChangeHandlers {
   *
   * Acl change events are also stored under two paths:
   * <ul>
-  *   <li>[[org.apache.kafka.common.resource.ResourceNameType#LITERAL Literal]] patterns are stored under '/kafka-acl-changes'.
+  *   <li>[[org.apache.kafka.common.resource.PatternType#LITERAL Literal]] patterns are stored under '/kafka-acl-changes'.
   *   The format is a UTF8 string in the form: &lt;resource-type&gt;:&lt;resource-name&gt;</li>
   *   <li>All other patterns are stored under '/kafka-acl-extended-changes'
   *   The format is JSON, as defined by [[kafka.zk.ExtendedAclChangeEvent]]</li>
   * </ul>
   */
 sealed trait ZkAclStore {
-  val patternType: ResourceNameType
+  val patternType: PatternType
   val aclPath: String
 
   def path(resourceType: ResourceType): String = s"$aclPath/$resourceType"
@@ -490,9 +490,11 @@ sealed trait ZkAclStore {
 }
 
 object ZkAclStore {
-  private val storesByType: Map[ResourceNameType, ZkAclStore] = ResourceNameType.values
-    .filter(nameType => nameType != ResourceNameType.ANY && nameType != ResourceNameType.UNKNOWN)
-    .map(nameType => (nameType, create(nameType)))
+  private val storesByType: Map[PatternType, ZkAclStore] = PatternType.values
+    .filter(patternType => patternType != PatternType.MATCH)
+    .filter(patternType => patternType != PatternType.ANY)
+    .filter(patternType => patternType != PatternType.UNKNOWN)
+    .map(patternType => (patternType, create(patternType)))
     .toMap
 
   val stores: Iterable[ZkAclStore] = storesByType.values
@@ -500,30 +502,30 @@ object ZkAclStore {
   val securePaths: Iterable[String] = stores
     .flatMap(store => Set(store.aclPath, store.changeStore.aclChangePath))
 
-  def apply(patternType: ResourceNameType): ZkAclStore = {
+  def apply(patternType: PatternType): ZkAclStore = {
     storesByType.get(patternType) match {
       case Some(store) => store
       case None => throw new KafkaException(s"Invalid pattern type: $patternType")
     }
   }
 
-  private def create(patternType: ResourceNameType) = {
+  private def create(patternType: PatternType) = {
     patternType match {
-      case ResourceNameType.LITERAL => LiteralAclStore
+      case PatternType.LITERAL => LiteralAclStore
       case _ => new ExtendedAclStore(patternType)
     }
   }
 }
 
 object LiteralAclStore extends ZkAclStore {
-  val patternType: ResourceNameType = ResourceNameType.LITERAL
+  val patternType: PatternType = PatternType.LITERAL
   val aclPath: String = "/kafka-acl"
 
   def changeStore: ZkAclChangeStore = LiteralAclChangeStore
 }
 
-class ExtendedAclStore(val patternType: ResourceNameType) extends ZkAclStore {
-  if (patternType == ResourceNameType.LITERAL)
+class ExtendedAclStore(val patternType: PatternType) extends ZkAclStore {
+  if (patternType == PatternType.LITERAL)
     throw new IllegalArgumentException("Literal pattern types are not supported")
 
   val aclPath: String = s"/kafka-acl-extended/${patternType.name.toLowerCase}"
@@ -579,7 +581,7 @@ case object LiteralAclChangeStore extends ZkAclChangeStore {
   val aclChangePath: String = "/kafka-acl-changes"
 
   def encode(resource: Resource): Array[Byte] = {
-    if (resource.nameType != ResourceNameType.LITERAL)
+    if (resource.patternType != PatternType.LITERAL)
       throw new IllegalArgumentException("Only literal resource patterns can be encoded")
 
     val legacyName = resource.resourceType + Resource.Separator + resource.name
@@ -589,7 +591,7 @@ case object LiteralAclChangeStore extends ZkAclChangeStore {
   def decode(bytes: Array[Byte]): Resource = {
     val string = new String(bytes, UTF_8)
     string.split(Separator, 2) match {
-        case Array(resourceType, resourceName, _*) => new Resource(ResourceType.fromString(resourceType), resourceName, ResourceNameType.LITERAL)
+        case Array(resourceType, resourceName, _*) => new Resource(ResourceType.fromString(resourceType), resourceName, PatternType.LITERAL)
         case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + string)
       }
   }
@@ -600,14 +602,14 @@ case object ExtendedAclChangeStore extends ZkAclChangeStore {
   val aclChangePath: String = "/kafka-acl-extended-changes"
 
   def encode(resource: Resource): Array[Byte] = {
-    if (resource.nameType == ResourceNameType.LITERAL)
+    if (resource.patternType == PatternType.LITERAL)
       throw new IllegalArgumentException("Literal pattern types are not supported")
 
     Json.encodeAsBytes(ExtendedAclChangeEvent(
       ExtendedAclChangeEvent.currentVersion,
       resource.resourceType.name,
       resource.name,
-      resource.nameType.name))
+      resource.patternType.name))
   }
 
   def decode(bytes: Array[Byte]): Resource = {
@@ -624,7 +626,7 @@ case object ExtendedAclChangeStore extends ZkAclChangeStore {
 }
 
 object ResourceZNode {
-  def path(resource: Resource): String = ZkAclStore(resource.nameType).path(resource.resourceType, resource.name)
+  def path(resource: Resource): String = ZkAclStore(resource.patternType).path(resource.resourceType, resource.name)
 
   def encode(acls: Set[Acl]): Array[Byte] = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
   def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion)
@@ -637,15 +639,15 @@ object ExtendedAclChangeEvent {
 case class ExtendedAclChangeEvent(@BeanProperty @JsonProperty("version") version: Int,
                                   @BeanProperty @JsonProperty("resourceType") resourceType: String,
                                   @BeanProperty @JsonProperty("name") name: String,
-                                  @BeanProperty @JsonProperty("resourceNameType") resourceNameType: String) {
+                                  @BeanProperty @JsonProperty("patternType") patternType: String) {
   if (version > ExtendedAclChangeEvent.currentVersion)
     throw new UnsupportedVersionException(s"Acl change event received for unsupported version: $version")
 
   def toResource: Try[Resource] = {
     for {
       resType <- Try(ResourceType.fromString(resourceType))
-      nameType <- Try(ResourceNameType.fromString(resourceNameType))
-      resource = Resource(resType, name, nameType)
+      patType <- Try(PatternType.fromString(patternType))
+      resource = Resource(resType, name, patType)
     } yield resource
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index d6f349c..6a81812 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
-import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern, ResourceType}
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
 import org.junit.rules.Timeout
 import org.junit.Assert._
 
@@ -1011,7 +1011,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     checkInvalidAlterConfigs(zkClient, servers, client)
   }
 
-  val ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
+  val ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
 
   /**
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c1923dc..c5df68b 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records,
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.resource.ResourceNameType.LITERAL
+import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests}
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 1f89ea3..88e19a1 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.{GroupAuthorizationException, TimeoutException, TopicAuthorizationException}
-import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -152,7 +152,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
                                           s"--add",
                                           s"--topic=$topicPrefix",
                                           s"--group=$groupPrefix",
-                                          s"--resource-name-type=prefixed",
+                                          s"--resource-pattern-type=prefixed",
                                           s"--consumer",
                                           s"--producer",
                                           s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 46bf722..9da6937 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -21,7 +21,7 @@ import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
 import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
-import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern, ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.junit.Assert.assertEquals
 import org.junit.{After, Assert, Before, Test}
@@ -89,19 +89,19 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     closeSasl()
   }
 
-  val anyAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", ResourceNameType.LITERAL),
+  val anyAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
-  val acl2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL),
+  val acl2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic2", PatternType.LITERAL),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
-  val acl3 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
+  val acl3 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
-  val fooAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL),
+  val fooAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foobar", PatternType.LITERAL),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
-  val prefixAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED),
+  val prefixAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic", PatternType.PREFIXED),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
-  val transactionalIdAcl = new AclBinding(new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "transactional_id", ResourceNameType.LITERAL),
+  val transactionalIdAcl = new AclBinding(new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "transactional_id", PatternType.LITERAL),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
-  val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", ResourceNameType.LITERAL),
+  val groupAcl = new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
 
   @Test
@@ -111,7 +111,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     val results = client.createAcls(List(acl2, acl3).asJava)
     assertEquals(Set(acl2, acl3), results.values.keySet().asScala)
     results.values.values().asScala.foreach(value => value.get)
-    val aclUnknown = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
+    val aclUnknown = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW))
     val results2 = client.createAcls(List(aclUnknown).asJava)
     assertEquals(Set(aclUnknown), results2.values.keySet().asScala)
@@ -132,9 +132,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
     waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl))
 
-    val filterA = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
-    val filterB = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
-    val filterC = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TRANSACTIONAL_ID, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val filterA = new AclBindingFilter(new ResourcePatternFilter(ResourceType.GROUP, null, PatternType.LITERAL), AccessControlEntryFilter.ANY)
+    val filterB = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", PatternType.LITERAL), AccessControlEntryFilter.ANY)
+    val filterC = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TRANSACTIONAL_ID, null, PatternType.LITERAL), AccessControlEntryFilter.ANY)
 
     waitForDescribeAcls(client, filterA, Set(groupAcl))
     waitForDescribeAcls(client, filterC, Set(transactionalIdAcl))
@@ -154,13 +154,13 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     client = AdminClient.create(createConfig())
     ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
 
-    val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY)
-    val allLiteralTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
-    val allPrefixedTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY)
-    val literalMyTopic2Acls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
-    val prefixedMyTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY)
-    val allMyTopic2Acls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.ANY), AccessControlEntryFilter.ANY)
-    val allFooTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foobar", ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+    val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.ANY), AccessControlEntryFilter.ANY)
+    val allLiteralTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.LITERAL), AccessControlEntryFilter.ANY)
+    val allPrefixedTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.PREFIXED), AccessControlEntryFilter.ANY)
+    val literalMyTopic2Acls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", PatternType.LITERAL), AccessControlEntryFilter.ANY)
+    val prefixedMyTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic", PatternType.PREFIXED), AccessControlEntryFilter.ANY)
+    val allMyTopic2Acls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", PatternType.MATCH), AccessControlEntryFilter.ANY)
+    val allFooTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foobar", PatternType.MATCH), AccessControlEntryFilter.ANY)
 
     assertEquals(Set(anyAcl), getAcls(anyAcl.toFilter))
     assertEquals(Set(prefixAcl), getAcls(prefixAcl.toFilter))
@@ -181,9 +181,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     client = AdminClient.create(createConfig())
     ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
 
-    val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY)
-    val allLiteralTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
-    val allPrefixedTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY)
+    val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.MATCH), AccessControlEntryFilter.ANY)
+    val allLiteralTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.LITERAL), AccessControlEntryFilter.ANY)
+    val allPrefixedTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.PREFIXED), AccessControlEntryFilter.ANY)
 
     // Delete only ACLs on literal 'mytopic2' topic
     var deleted = client.deleteAcls(List(acl2.toFilter).asJava).all().get().asScala.toSet
@@ -231,11 +231,11 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     client = AdminClient.create(createConfig())
     ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))  // <-- prefixed exists, but should never be returned.
 
-    val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY)
-    val legacyAllTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
-    val legacyMyTopic2Acls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
-    val legacyAnyTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "*", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
-    val legacyFooTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+    val allTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.MATCH), AccessControlEntryFilter.ANY)
+    val legacyAllTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.LITERAL), AccessControlEntryFilter.ANY)
+    val legacyMyTopic2Acls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "mytopic2", PatternType.LITERAL), AccessControlEntryFilter.ANY)
+    val legacyAnyTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "*", PatternType.LITERAL), AccessControlEntryFilter.ANY)
+    val legacyFooTopicAcls = new AclBindingFilter(new ResourcePatternFilter(ResourceType.TOPIC, "foobar", PatternType.LITERAL), AccessControlEntryFilter.ANY)
 
     assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(legacyAllTopicAcls))
     assertEquals(Set(acl2), getAcls(legacyMyTopic2Acls))
@@ -266,9 +266,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
   @Test
   def testAttemptToCreateInvalidAcls(): Unit = {
     client = AdminClient.create(createConfig())
-    val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", ResourceNameType.LITERAL),
+    val clusterAcl = new AclBinding(new ResourcePattern(ResourceType.CLUSTER, "foobar", PatternType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
-    val emptyResourceNameAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "", ResourceNameType.LITERAL),
+    val emptyResourceNameAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "", PatternType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
     val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions())
     assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.values.keySet().asScala)
@@ -336,7 +336,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
 
   private def testAclGet(expectAuth: Boolean): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val userAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", ResourceNameType.LITERAL),
+      val userAcl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
         new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
       val results = client.describeAcls(userAcl.toFilter)
       if (expectAuth) {
diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
index c7ed949..1b7a833 100644
--- a/core/src/test/scala/kafka/security/auth/ResourceTest.scala
+++ b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
@@ -18,7 +18,7 @@
 package kafka.security.auth
 
 import kafka.common.KafkaException
-import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.junit.Test
 import org.junit.Assert._
 
diff --git a/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala b/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala
index 4e8580b..ee6c399 100644
--- a/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala
+++ b/core/src/test/scala/kafka/zk/ExtendedAclStoreTest.scala
@@ -18,7 +18,7 @@
 package kafka.zk
 
 import kafka.security.auth.{Resource, Topic}
-import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
diff --git a/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala b/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala
index 22d6f23..ec5de1b 100644
--- a/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala
+++ b/core/src/test/scala/kafka/zk/LiteralAclStoreTest.scala
@@ -17,8 +17,10 @@
 
 package kafka.zk
 
-import kafka.security.auth.{Resource, Topic}
-import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import kafka.security.auth.{Group, Resource, Topic}
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
@@ -59,4 +61,14 @@ class LiteralAclStoreTest {
 
     assertEquals(literalResource, actual)
   }
+
+  @Test
+  def shouldDecodeResourceUsingTwoPartLogic(): Unit = {
+    val resource = Resource(Group, "PREFIXED:this, including the PREFIXED part, is a valid two part group name", LITERAL)
+    val encoded = (resource.resourceType +  Resource.Separator + resource.name).getBytes(UTF_8)
+
+    val actual = store.changeStore.decode(encoded)
+
+    assertEquals(resource, actual)
+  }
 }
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 76cf787..26ae073 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -23,7 +23,7 @@ import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.{Before, Test}
 
@@ -133,7 +133,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def testAclsOnPrefixedResources(): Unit = {
-    val cmd = Array("--allow-principal", principal.toString, "--producer", "--topic", "Test-", "--resource-name-type", "Prefixed")
+    val cmd = Array("--allow-principal", principal.toString, "--producer", "--topic", "Test-", "--resource-pattern-type", "Prefixed")
 
     AclCommand.main(zkArgs ++ cmd :+ "--add")
 
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 58f0962..0462300 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -19,7 +19,7 @@ package kafka.common
 import kafka.security.auth.{Group, Resource}
 import kafka.utils.TestUtils
 import kafka.zk.{LiteralAclChangeStore, LiteralAclStore, ZkAclChangeStore, ZooKeeperTestHarness}
-import org.apache.kafka.common.resource.ResourceNameType.LITERAL
+import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.junit.{After, Before, Test}
 
 import scala.collection.mutable.ArrayBuffer
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index b301271..7ab3c0a 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -28,8 +28,8 @@ import kafka.utils.TestUtils
 import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
 import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
 import org.apache.kafka.common.errors.UnsupportedVersionException
-import org.apache.kafka.common.resource.ResourceNameType
-import org.apache.kafka.common.resource.ResourceNameType.{LITERAL, PREFIXED}
+import org.apache.kafka.common.resource.PatternType
+import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
 import org.junit.Assert._
@@ -37,23 +37,21 @@ import org.junit.{After, Before, Test}
 
 class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
-  val allowReadAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Read)
-  val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
-  val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
-
-  val wildCardResource = Resource(Topic, WildCardResource, LITERAL)
-  val prefixedResource = Resource(Topic, "foo", PREFIXED)
-
-  val simpleAclAuthorizer = new SimpleAclAuthorizer
-  val simpleAclAuthorizer2 = new SimpleAclAuthorizer
-  val testPrincipal = Acl.WildCardPrincipal
-  val testHostName = InetAddress.getByName("192.168.0.1")
-  var resource: Resource = null
-  val superUsers = "User:superuser1; User:superuser2"
-  val username = "alice"
-  val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-  val session = Session(principal, testHostName)
-  var config: KafkaConfig = _
+  private val allowReadAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Read)
+  private val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
+  private val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
+
+  private val wildCardResource = Resource(Topic, WildCardResource, LITERAL)
+  private val prefixedResource = Resource(Topic, "foo", PREFIXED)
+
+  private val simpleAclAuthorizer = new SimpleAclAuthorizer
+  private val simpleAclAuthorizer2 = new SimpleAclAuthorizer
+  private var resource: Resource = _
+  private val superUsers = "User:superuser1; User:superuser2"
+  private val username = "alice"
+  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+  private val session = Session(principal, InetAddress.getByName("192.168.0.1"))
+  private var config: KafkaConfig = _
   private var zooKeeperClient: ZooKeeperClient = _
 
   @Before
@@ -545,23 +543,24 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
   @Test
   def testGetAclsPrincipal(): Unit = {
-    assertEquals(0, simpleAclAuthorizer.getAcls(principal).size)
+    val aclOnSpecificPrincipal = new Acl(principal, Allow, WildCardHost, Write)
+    simpleAclAuthorizer.addAcls(Set[Acl](aclOnSpecificPrincipal), resource)
 
-    val acl1 = new Acl(principal, Allow, WildCardHost, Write)
-    simpleAclAuthorizer.addAcls(Set[Acl](acl1), resource)
-    assertEquals(1, simpleAclAuthorizer.getAcls(principal).size)
+    assertEquals("acl on specific should not be returned for wildcard request",
+      0, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size)
+    assertEquals("acl on specific should be returned for specific request",
+      1, simpleAclAuthorizer.getAcls(principal).size)
+    assertEquals("acl on specific should be returned for different principal instance",
+      1, simpleAclAuthorizer.getAcls(new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size)
 
-    simpleAclAuthorizer.addAcls(Set[Acl](acl1), Resource(Topic, Acl.WildCardResource, LITERAL))
-    assertEquals(2, simpleAclAuthorizer.getAcls(principal).size)
-
-    val acl2 = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
-    simpleAclAuthorizer.addAcls(Set[Acl](acl1), Resource(Group, "groupA", LITERAL))
-    assertEquals(3, simpleAclAuthorizer.getAcls(principal).size)
+    simpleAclAuthorizer.removeAcls(resource)
+    val aclOnWildcardPrincipal = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
+    simpleAclAuthorizer.addAcls(Set[Acl](aclOnWildcardPrincipal), resource)
 
-    // add prefixed principal acl on wildcard group name
-    val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName.charAt(0) + WildCardResource), Allow, WildCardHost, Write)
-    simpleAclAuthorizer.addAcls(Set[Acl](acl1), Resource(Group, Acl.WildCardResource, LITERAL))
-    assertEquals(4, simpleAclAuthorizer.getAcls(principal).size)
+    assertEquals("acl on wildcard should be returned for wildcard request",
+      1, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size)
+    assertEquals("acl on wildcard should not be returned for specific request",
+      0, simpleAclAuthorizer.getAcls(principal).size)
   }
 
   @Test(expected = classOf[UnsupportedVersionException])
@@ -634,7 +633,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     simpleAclAuthorizer.configure(config.originals)
   }
 
-  private def getAclChangeEventAsString(patternType: ResourceNameType) = {
+  private def getAclChangeEventAsString(patternType: PatternType) = {
     val store = ZkAclStore(patternType)
     val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.changeStore.aclChangePath))
     children.maybeThrow()
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index 7df30c9..da7a22a 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -28,7 +28,7 @@ import kafka.server.{CreateTokenResult, Defaults, DelegationTokenManager, KafkaC
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.resource.ResourceNameType.LITERAL
+import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index f503546..0205bcf 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -25,13 +25,12 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.protocol.types.Struct
-import org.apache.kafka.common.resource.ResourceNameType
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests._
@@ -314,12 +313,12 @@ class RequestQuotaTest extends BaseRequestTest {
 
         case ApiKeys.CREATE_ACLS =>
           new CreateAclsRequest.Builder(Collections.singletonList(new AclCreation(new AclBinding(
-            new ResourcePattern(AdminResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
+            new ResourcePattern(AdminResourceType.TOPIC, "mytopic", PatternType.LITERAL),
             new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY)))))
 
         case ApiKeys.DELETE_ACLS =>
           new DeleteAclsRequest.Builder(Collections.singletonList(new AclBindingFilter(
-            new ResourcePatternFilter(AdminResourceType.TOPIC, null, ResourceNameType.LITERAL),
+            new ResourcePatternFilter(AdminResourceType.TOPIC, null, PatternType.LITERAL),
             new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY))))
 
         case ApiKeys.DESCRIBE_CONFIGS =>
diff --git a/docs/security.html b/docs/security.html
index 41111f7..34ceeba 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1024,7 +1024,7 @@
     <h3><a id="security_authz" href="#security_authz">7.4 Authorization and ACLs</a></h3>
     Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses zookeeper to store all the acls. The Authorizer is configured by setting <code>authorizer.class.name</code> in server.properties. To enable the out of the box implementation use:
     <pre>authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer</pre>
-    Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R". You can read more about the acl structure on KIP-11. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if a Resource R has no associated acls, no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
+    Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want [...]
     <pre>allow.everyone.if.no.acl.found=true</pre>
     One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma).
     <pre>super.users=User:Bob;User:Alice</pre>
@@ -1085,31 +1085,30 @@
         </tr>
         <tr>
             <td>--cluster</td>
-            <td>Specifies cluster as resource.</td>
+            <td>Indicates to the script that the user is trying to interact with acls on the singular cluster resource.</td>
             <td></td>
-            <td>Resource</td>
+            <td>ResourcePattern</td>
         </tr>
         <tr>
             <td>--topic [topic-name]</td>
-            <td>Specifies the topic as resource.</td>
+            <td>Indicates to the script that the user is trying to interact with acls on topic resource pattern(s).</td>
             <td></td>
-            <td>Resource</td>
+            <td>ResourcePattern</td>
         </tr>
         <tr>
             <td>--group [group-name]</td>
-            <td>Specifies the consumer-group as resource.</td>
+            <td>Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s)</td>
             <td></td>
-            <td>Resource</td>
+            <td>ResourcePattern</td>
         </tr>
         <tr>
-            <td>--resource-name-type [name-type]</td>
-            <td>Specifies the resource name type to use.<br>
-                Valid values are:<br>
-                <ul>
-                    <li><b>Literal</b> Match resource names exactly or, in the case of the Wildcard name '*', match all resources.</li>
-                    <li><b>Prefixed</b> Match any resource whose name starts with the prefix.</li>
-                    <li><b>All</b> (list|remove only) Matching any name type, including the Wildcard name.</li>
-                </ul>
+            <td>--resource-pattern-type [pattern-type]</td>
+            <td>Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use.<br>
+                When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'.<br>
+                When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern,
+                or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly,
+                and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s).<br>
+                WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.
             </td>
             <td>literal</td>
             <td>Configuration</td>
@@ -1175,27 +1174,31 @@
             By default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
             <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic</pre>
             Note that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported).
-            Above examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].</li>
+            Above examples add acls to a topic by specifying --topic [topic-name] as the resource pattern option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].
             You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl "Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0"
             You can do that by using the wildcard resource '*', e.g. by executing the CLI with following options:
             <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *</pre>
-            You can add acls on resources matching a certain prefix, e.g. suppose you want to add an acl "Principal User:Jane is allowed to produce to any Topic whose name is prefixed with 'Test-' from any host".
+            You can add acls on prefixed resource patterns, e.g. suppose you want to add an acl "Principal User:Jane is allowed to produce to any Topic whose name starts with 'Test-' from any host".
             You can do that by executing the CLI with following options:
-            <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-name-type Prefixed</pre>
-            Note, --resource-name-type defaults to 'literal', which only affects resources with the exact same name. The exception to this is the wildcard resource name '*', which should also be added using 'literal'.
+            <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed</pre>
+            Note, --resource-pattern-type defaults to 'literal', which only affects resources with the exact same name or, in the case of the wildcard resource name '*', a resource with any name.</li>
 
         <li><b>Removing Acls</b><br>
                 Removing acls is pretty much the same. The only difference is instead of --add option users will have to specify --remove option. To remove the acls added by the first example above we can execute the CLI with following options:
-            <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic </pre></li>
-            If you wan to remove the prefixed acl added above we can execute the CLI with following options:
-            <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-name-type Prefixed</pre></li>
+            <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic </pre>
+            If you wan to remove the acl added to the prefixed resource pattern above we can execute the CLI with following options:
+            <pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed</pre></li>
 
         <li><b>List Acls</b><br>
-                We can list acls for any resource by specifying the --list option with the resource. To list all acls for Test-topic we can execute the CLI with following options:
-                <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic</pre></li>
-                However, this will only return the acls that have been added to this exact resource. Other acls can exist that affect access to the topic,
-                e.g. any acls on the topic wildcard '*', or any acls on resources matching a certain prefix. To list all acls affecting a topic we can use the '--resource-name-type any' option, e.g.
-                <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-name-type any</pre></li>
+                We can list acls for any resource by specifying the --list option with the resource. To list all acls on the literal resource pattern Test-topic, we can execute the CLI with following options:
+                <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic</pre>
+                However, this will only return the acls that have been added to this exact resource pattern. Other acls can exist that affect access to the topic,
+                e.g. any acls on the topic wildcard '*', or any acls on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly:
+                <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic *</pre>
+                However, it is not necessarily possible to explicitly query for acls on prefixed resource patterns that match Test-topic as the name of such patterns may not be known.
+                We can list <i>all</i> acls affecting Test-topic by using '--resource-pattern-type match', e.g.
+                <pre class="brush: bash;">bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type match</pre>
+                This will list acls on all matching literal, wildcard and prefixed resource patterns.</li>
 
         <li><b>Adding or removing a principal as producer or consumer</b><br>
                 The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of  Test-topic we can execute the following command:

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.