You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2016/03/22 07:24:01 UTC

[04/11] incubator-sentry git commit: SENTRY-1029: Address review comments for Kafka model that came after patch got committed. (Ashish K Singh, reviewed by Hao Hao, via Anne Yu)

SENTRY-1029: Address review comments for Kafka model that came after patch got committed. (Ashish K Singh, reviewed by Hao Hao, via Anne Yu)


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/184a32d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/184a32d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/184a32d6

Branch: refs/heads/master
Commit: 184a32d68fd319ff00e187d5d3558bc330d3c560
Parents: 7ce0373
Author: Anne Yu <an...@cloudera.com>
Authored: Tue Feb 16 15:02:55 2016 -0800
Committer: hahao <ha...@cloudera.com>
Committed: Mon Mar 21 23:13:30 2016 -0700

----------------------------------------------------------------------
 dev-support/smart-apply-patch.sh                |   0
 .../sentry/kafka/binding/KafkaAuthBinding.java  |   2 +-
 .../src/test/resources/test-authz-provider.ini  |   4 +-
 .../apache/sentry/core/model/kafka/Cluster.java |  29 +++--
 .../sentry/core/model/kafka/ConsumerGroup.java  |  24 +++-
 .../apache/sentry/core/model/kafka/Host.java    |  26 ++++-
 .../core/model/kafka/KafkaActionConstant.java   |   6 +-
 .../core/model/kafka/KafkaActionFactory.java    | 109 +++++++++++++++++--
 .../core/model/kafka/KafkaAuthorizable.java     |  30 ++++-
 .../apache/sentry/core/model/kafka/Topic.java   |  24 +++-
 .../core/model/kafka/TestKafkaAction.java       |  27 ++---
 .../core/model/kafka/TestKafkaAuthorizable.java |   5 +-
 .../policy/kafka/KafkaModelAuthorizables.java   |   2 +-
 .../policy/kafka/KafkaPrivilegeValidator.java   |  97 +++++++++++++----
 .../policy/kafka/KafkaWildcardPrivilege.java    |  29 +++--
 .../kafka/TestKafkaModelAuthorizables.java      |  18 +++
 .../kafka/TestKafkaPrivilegeValidator.java      |  75 +++++++++++--
 .../kafka/TestKafkaWildcardPrivilege.java       |   4 +
 .../engine/AbstractTestKafkaPolicyEngine.java   |   4 +-
 .../kafka/provider/TestKafkaPolicyNegative.java |   2 +-
 .../src/test/resources/test-authz-provider.ini  |   4 +-
 21 files changed, 419 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/dev-support/smart-apply-patch.sh
----------------------------------------------------------------------
diff --git a/dev-support/smart-apply-patch.sh b/dev-support/smart-apply-patch.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index ccbe60e..9e72d78 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -50,7 +50,7 @@ public class KafkaAuthBinding {
     private final AuthorizationProvider authProvider;
     private ProviderBackend providerBackend;
 
-    private final KafkaActionFactory actionFactory = new KafkaActionFactory();
+    private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
 
     public KafkaAuthBinding(Configuration authConf) throws Exception {
         this.authConf = authConf;

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini b/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
index 5f85382..520e1d0 100644
--- a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
+++ b/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini
@@ -27,8 +27,8 @@ producer2 = producer_t2_host2
 consumer_producer0 = consumer_producer_t1
 
 [roles]
-admin_all = host=*
-admin_host1 = host=1.2.3.4
+admin_all = host=*->action=all
+admin_host1 = host=1.2.3.4->action=all
 consumer_t1_all = host=*->topic=t1->action=read
 consumer_t1_host1 = host=host1->topic=t1->action=read
 consumer_t2_host2 = host=host2->topic=t2->action=read

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
index b1fc063..bb30b1b 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
@@ -17,32 +17,47 @@
 package org.apache.sentry.core.model.kafka;
 
 /**
- * Represents the Cluster authorizable in the Kafka model
+ * Represents Cluster authorizable in Kafka model.
  */
 public class Cluster implements KafkaAuthorizable {
+  private String name;
+
   /**
-   * Represents all clusters
+   * Create a Cluster authorizable for Kafka cluster of a given name.
+   *
+   * @param name Name of Kafka cluster.
    */
-  public static final Cluster ALL = new Cluster(KafkaAuthorizable.ALL);
-
-  private String name;
   public Cluster(String name) {
     this.name = name;
   }
 
+  /**
+   * Get type of Kafka's cluster authorizable.
+   *
+   * @return Type of Kafka's cluster authorizable.
+   */
   @Override
   public AuthorizableType getAuthzType() {
     return AuthorizableType.CLUSTER;
   }
 
+  /**
+   * Get name of Kafka's cluster.
+   *
+   * @return Name of Kafka's cluster.
+   */
   @Override
   public String getName() {
     return name;
   }
 
+  /**
+   * Get type name of Kafka's cluster authorizable.
+   *
+   * @return Type name of Kafka's cluster authorizable.
+   */
   @Override
   public String getTypeName() {
     return getAuthzType().name();
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/ConsumerGroup.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/ConsumerGroup.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/ConsumerGroup.java
index 9525aaf..5fc4e8c 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/ConsumerGroup.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/ConsumerGroup.java
@@ -19,26 +19,42 @@ package org.apache.sentry.core.model.kafka;
  * Represents the ConsumerGroup authorizable in the Kafka model
  */
 public class ConsumerGroup implements KafkaAuthorizable {
+  private String name;
+
   /**
-   * Represents all consumer groups
+   * Create a Consumer-Group authorizable for Kafka cluster of a given name.
+   *
+   * @param name Name of Consumer-Group in a Kafka cluster.
    */
-  public static ConsumerGroup ALL = new ConsumerGroup(KafkaAuthorizable.ALL);
-
-  private String name;
   public ConsumerGroup(String name) {
     this.name = name;
   }
 
+  /**
+   * Get type of Kafka's consumer-group authorizable.
+   *
+   * @return Type of Kafka's consumer-group authorizable.
+   */
   @Override
   public AuthorizableType getAuthzType() {
     return AuthorizableType.CONSUMERGROUP;
   }
 
+  /**
+   * Get name of Kafka's consumer-group.
+   *
+   * @return Name of Kafka's consumer-group.
+   */
   @Override
   public String getName() {
     return name;
   }
 
+  /**
+   * Get type name of Kafka's consumer-group authorizable.
+   *
+   * @return Type name of Kafka's consumer-group authorizable.
+   */
   @Override
   public String getTypeName() {
     return getAuthzType().name();

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Host.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Host.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Host.java
index e0f4160..48a18f6 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Host.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Host.java
@@ -17,29 +17,45 @@
 package org.apache.sentry.core.model.kafka;
 
 /**
- * Represents the Host authorizable in the Kafka model
+ * Represents Host authorizable in Kafka model
  */
 public class Host implements KafkaAuthorizable {
+  private String name;
+
   /**
-   * Represents all hosts
+   * Create a Kafka's Host authorizable of a given string representation.
+   *
+   * @param name String representation of host.
    */
-  public static Host ALL = new Host(KafkaAuthorizable.ALL);
-
-  private String name;
   public Host(String name) {
     this.name = name;
   }
 
+  /**
+   * Get authorizable type of Host authorizable.
+   *
+   * @return Type of Host authorizable.
+   */
   @Override
   public AuthorizableType getAuthzType() {
     return AuthorizableType.HOST;
   }
 
+  /**
+   * Get name of Kafka's host authorizable.
+   *
+   * @return Name of Kafka's host authorizable.
+   */
   @Override
   public String getName() {
     return name;
   }
 
+  /**
+   * Get type name of Kafka's host authorizable.
+   *
+   * @return Type name of Kafka's host authorizable.
+   */
   @Override
   public String getTypeName() {
     return getAuthzType().name();

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
index 13421f9..17d7fb7 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionConstant.java
@@ -16,10 +16,12 @@
  */
 package org.apache.sentry.core.model.kafka;
 
+/**
+ * Actions supported by Kafka on its authorizable resources.
+ */
 public class KafkaActionConstant {
 
-  public static final String ALL = "*";
-  public static final String ALL_NAME = "ALL";
+  public static final String ALL = "ALL";
   public static final String READ = "read";
   public static final String WRITE = "write";
   public static final String CREATE = "create";

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
index 2577406..7b8b518 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java
@@ -14,6 +14,7 @@
  */
 package org.apache.sentry.core.model.kafka;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.sentry.core.common.BitFieldAction;
@@ -21,44 +22,108 @@ import org.apache.sentry.core.common.BitFieldActionFactory;
 
 import com.google.common.collect.Lists;
 
+/**
+ * Factory for creating actions supported by Kafka.
+ */
 public class KafkaActionFactory extends BitFieldActionFactory {
+  private static KafkaActionFactory instance;
+  private KafkaActionFactory() {}
+
+  /**
+   * Get instance of KafkaActionFactory, which is a singleton.
+   *
+   * @return Instance of KafkaActionFactory.
+   */
+  public static KafkaActionFactory getInstance() {
+    if (instance == null) {
+      instance = new KafkaActionFactory();
+    }
+
+    return instance;
+  }
 
-  enum KafkaActionType {
+  /**
+   * Types of actions supported by Kafka.
+   */
+  public enum KafkaActionType {
     READ(KafkaActionConstant.READ, 1),
     WRITE(KafkaActionConstant.WRITE, 2),
     CREATE(KafkaActionConstant.CREATE, 4),
     DELETE(KafkaActionConstant.DELETE, 8),
     ALTER(KafkaActionConstant.ALTER, 16),
     DESCRIBE(KafkaActionConstant.DESCRIBE, 32),
-    ADMIN(KafkaActionConstant.CLUSTER_ACTION, 64),
+    CLUSTER_ACTION(KafkaActionConstant.CLUSTER_ACTION, 64),
     ALL(KafkaActionConstant.ALL, READ.getCode() | WRITE.getCode() | CREATE.getCode()
-        | DELETE.getCode() | ALTER.getCode()| DESCRIBE.getCode() | ADMIN.getCode());
+        | DELETE.getCode() | ALTER.getCode()| DESCRIBE.getCode() | CLUSTER_ACTION.getCode());
 
     private String name;
     private int code;
 
+    /**
+     * Create Kafka action type based on provided kafkaAction and code.
+     *
+     * @param name Name of Kafka action.
+     * @param code Integer representation of Kafka action's code.
+     */
     KafkaActionType(String name, int code) {
       this.name = name;
       this.code = code;
     }
 
+    /**
+     * Get code for this Kafka's action.
+     *
+     * @return Code for this Kafka's action.
+     */
     public int getCode() {
       return code;
     }
 
+    /**
+     * Get kafkaAction of this Kafka's action.
+     *
+     * @return Name of this Kafka's action.
+     */
     public String getName() {
       return name;
     }
 
+    /**
+     * Check if Kafka action type with {@code kafkaAction} as string representation exists.
+     *
+     * @param name String representation of a valid Kafka action type.
+     * @return If Kafka action type with {@code kafkaAction} as string representation exists.
+     */
+    static boolean hasActionType(String name) {
+      for (KafkaActionType action : KafkaActionType.values()) {
+        if (action.name.equalsIgnoreCase(name)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /**
+     * Create Kafka's action of type provided as kafkaAction.
+     *
+     * @param name String representation of Kafka's action type.
+     * @return Kafka's action type based on provided kafkaAction, if such action type is found, else null.
+     */
     static KafkaActionType getActionByName(String name) {
       for (KafkaActionType action : KafkaActionType.values()) {
         if (action.name.equalsIgnoreCase(name)) {
           return action;
         }
       }
-      throw new RuntimeException("can't get ActionType by name:" + name);
+      return null; // Can't get ActionType of provided kafkaAction
     }
 
+    /**
+     * Create Kafka's action types represented by provided code.
+     *
+     * @param code Integer representation of Kafka's action types.
+     * @return List of Kafka's action types represented by provided code, if none action types are found return an empty list.
+     */
     static List<KafkaActionType> getActionByCode(int code) {
       List<KafkaActionType> actions = Lists.newArrayList();
       for (KafkaActionType action : KafkaActionType.values()) {
@@ -68,22 +133,41 @@ public class KafkaActionFactory extends BitFieldActionFactory {
         }
       }
       if (actions.isEmpty()) {
-        throw new RuntimeException("can't get ActionType by code:" + code);
+        return Arrays.asList();
       }
       return actions;
     }
   }
 
+  /**
+   * Kafka Action
+   */
   public static class KafkaAction extends BitFieldAction {
+    /**
+     * Create Kafka action based on provided kafkaAction.
+     *
+     * @param name Name of Kafka action.
+     */
     public KafkaAction(String name) {
       this(KafkaActionType.getActionByName(name));
     }
 
+    /**
+     * Create Kafka action based on provided Kafka action type.
+     *
+     * @param actionType Type of Kafka action for which action has to be created.
+     */
     public KafkaAction(KafkaActionType actionType) {
-      super(actionType.name, actionType.code);
+      super(actionType.name(), actionType.getCode());
     }
   }
 
+  /**
+   * Get Kafka actions represented by provided action code.
+   *
+   * @param actionCode Integer code for required Kafka actions.
+   * @return List of Kafka actions represented by provided action code.
+   */
   @Override
   public List<KafkaAction> getActionsByCode(int actionCode) {
     List<KafkaAction> actions = Lists.newArrayList();
@@ -93,13 +177,14 @@ public class KafkaActionFactory extends BitFieldActionFactory {
     return actions;
   }
 
+  /**
+   * Get Kafka action represented by provided action kafkaAction.
+   *
+   * @param name String representation of required action kafkaAction.
+   * @return Kafka action represented by provided action kafkaAction.
+   */
   @Override
   public KafkaAction getActionByName(String name) {
-    // Check the name is All
-    if (KafkaActionConstant.ALL_NAME.equalsIgnoreCase(name)) {
-      return new KafkaAction(KafkaActionType.ALL);
-    }
-    return new KafkaAction(name);
+    return KafkaActionType.hasActionType(name) ? new KafkaAction(name) : null;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
index 0d2155e..18600f1 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaAuthorizable.java
@@ -19,11 +19,31 @@ package org.apache.sentry.core.model.kafka;
 import org.apache.sentry.core.common.Authorizable;
 
 /**
- * This interface represents authorizable resource in the Kafka component.
- * It used conjunction with the generic authorization model(SENTRY-398).
+ * This interface represents authorizable resource in Kafka component.
+ * It uses conjunction with generic authorization model (SENTRY-398).
+ *
+ * Authorizables here are mapped to Kafka resources based on below mentioned mapping.
+ *
+ * CLUSTER ->       Kafka Cluster resource, users are required to have access to this resource in
+ *                  order to perform cluster level actions like create topic, delete topic, etc.
+ *
+ * HOST ->          Kafka allows to authorize requests based on the host it is coming from. Though,
+ *                  Host is not a resource in Kafka, each Kafka Acl has host in it. In order to
+ *                  provide host based resource authorization, Host is treated as a Kafka resource
+ *                  in Sentry.
+ *
+ * TOPIC ->         Kafka Topic resource, users are required to have access to this resource in
+ *                  order to perform topic level actions like reading from a topic, writing to a
+ *                  topic, etc.
+ *
+ * CONSUMERGROUP -> Kafka ConsumerGroup resource, users are required to have access to this resource
+ *                  in order to perform ConsumerGroup level actions like joining a consumer group,
+ *                  querying offset for a partition for a particular consumer group.
  */
 public interface KafkaAuthorizable extends Authorizable {
-  public static final String ALL = "*"; // NOPMD - TODO(sdp) Remove before merge
+  /**
+   * Types of resources that Kafka supports authorization on.
+   */
   public enum AuthorizableType {
     CLUSTER,
     HOST,
@@ -31,5 +51,9 @@ public interface KafkaAuthorizable extends Authorizable {
     CONSUMERGROUP
   };
 
+  /**
+   * Get type of this Kafka authorizable.
+   * @return Type of this Kafka authorizable.
+   */
   public AuthorizableType getAuthzType(); // NOPMD - TODO(sdp) Remove before merge
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Topic.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Topic.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Topic.java
index 9e288b0..2b7c05e 100644
--- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Topic.java
+++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Topic.java
@@ -19,26 +19,42 @@ package org.apache.sentry.core.model.kafka;
  * Represents the Topic authorizable in the Kafka model
  */
 public class Topic implements KafkaAuthorizable {
+  private String name;
+
   /**
-   * Represents all topics
+   * Create a Topic authorizable for Kafka cluster of a given name.
+   *
+   * @param name Name of Kafka topic.
    */
-  public static Topic ALL = new Topic(KafkaAuthorizable.ALL);
-
-  private String name;
   public Topic(String name) {
     this.name = name;
   }
 
+  /**
+   * Get type of Kafka's topic authorizable.
+   *
+   * @return Type of Kafka's topic authorizable.
+   */
   @Override
   public AuthorizableType getAuthzType() {
     return AuthorizableType.TOPIC;
   }
 
+  /**
+   * Get name of Kafka's topic.
+   *
+   * @return Name of Kafka's topic.
+   */
   @Override
   public String getName() {
     return name;
   }
 
+  /**
+   * Get type name of Kafka's topic authorizable.
+   *
+   * @return Type name of Kafka's topic authorizable.
+   */
   @Override
   public String getTypeName() {
     return getAuthzType().name();

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
index f22ebc0..e5fc7ff 100644
--- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
+++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAction.java
@@ -25,8 +25,11 @@ import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
+/**
+ * Test KafkaActionFactory creates expected Kafka action instances.
+ */
 public class TestKafkaAction {
-  private KafkaActionFactory factory = new KafkaActionFactory();
+  private KafkaActionFactory factory = KafkaActionFactory.getInstance();
 
   @Test
   public void testImpliesAction() {
@@ -39,7 +42,6 @@ public class TestKafkaAction {
         (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
     KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
     KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL);
-    KafkaAction allNameAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL_NAME);
 
     assertTrue(allAction.implies(readAction));
     assertTrue(allAction.implies(writeAction));
@@ -112,15 +114,6 @@ public class TestKafkaAction {
     assertFalse(adminAction.implies(describeAction));
     assertTrue(adminAction.implies(adminAction));
     assertFalse(adminAction.implies(allAction));
-
-    assertTrue(allNameAction.implies(readAction));
-    assertTrue(allNameAction.implies(writeAction));
-    assertTrue(allNameAction.implies(createAction));
-    assertTrue(allNameAction.implies(deleteAction));
-    assertTrue(allNameAction.implies(alterAction));
-    assertTrue(allNameAction.implies(describeAction));
-    assertTrue(allNameAction.implies(adminAction));
-    assertTrue(allNameAction.implies(allAction));
   }
 
   @Test
@@ -134,7 +127,6 @@ public class TestKafkaAction {
         (KafkaAction) factory.getActionByName(KafkaActionConstant.DESCRIBE);
     KafkaAction adminAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.CLUSTER_ACTION);
     KafkaAction allAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL);
-    KafkaAction allNameAction = (KafkaAction) factory.getActionByName(KafkaActionConstant.ALL_NAME);
 
     assertTrue(readAction.equals(new KafkaAction(KafkaActionConstant.READ)));
     assertTrue(writeAction.equals(new KafkaAction(KafkaActionConstant.WRITE)));
@@ -144,7 +136,6 @@ public class TestKafkaAction {
     assertTrue(describeAction.equals(new KafkaAction(KafkaActionConstant.DESCRIBE)));
     assertTrue(adminAction.equals(new KafkaAction(KafkaActionConstant.CLUSTER_ACTION)));
     assertTrue(allAction.equals(new KafkaAction(KafkaActionConstant.ALL)));
-    assertTrue(allNameAction.equals(new KafkaAction(KafkaActionConstant.ALL)));
   }
 
   @Test
@@ -177,4 +168,14 @@ public class TestKafkaAction {
         alterAction, describeAction, adminAction), factory.getActionsByCode(allAction
         .getActionCode()));
   }
+
+  @Test
+  public void testGetActionForInvalidName() {
+    assertEquals("Failed to NOT create Kafka action for invalid name.", null, factory.getActionByName("INVALID"));
+  }
+
+  @Test
+  public void testGetActionForInvalidCode() {
+    assertEquals("Failed to NOT create Kafka actions for invalid code.", 0, factory.getActionsByCode(0).size());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
index 1abb116..20d5e8e 100644
--- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
+++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
@@ -26,10 +26,13 @@ import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.core.model.kafka.Topic;
 import org.junit.Test;
 
+/**
+ * Test proper KafkaAuthorizable is created for various Kafka resources.
+ */
 public class TestKafkaAuthorizable {
 
   @Test
-  public void testSimpleName() throws Exception {
+  public void testName() throws Exception {
     String name = "simple";
     Host host = new Host(name);
     Assert.assertEquals(host.getName(), name);

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
index ba93036..f1ed000 100644
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
@@ -27,7 +27,7 @@ import org.apache.sentry.provider.common.KeyValue;
 public class KafkaModelAuthorizables {
   public static KafkaAuthorizable from(KeyValue keyValue) {
     String prefix = keyValue.getKey().toLowerCase();
-    String name = keyValue.getValue().toLowerCase();
+    String name = keyValue.getValue();
     for (AuthorizableType type : AuthorizableType.values()) {
       if (prefix.equalsIgnoreCase(type.name())) {
         return from(type, name);

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
index ecad355..5cdfd3f 100644
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
@@ -19,8 +19,12 @@ package org.apache.sentry.policy.kafka;
 import static org.apache.sentry.provider.common.ProviderConstants.AUTHORIZABLE_SPLITTER;
 import static org.apache.sentry.provider.common.ProviderConstants.PRIVILEGE_PREFIX;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory;
 import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
 import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.policy.common.PrivilegeValidator;
@@ -29,40 +33,87 @@ import org.apache.shiro.config.ConfigurationException;
 
 import com.google.common.collect.Lists;
 
+/**
+ * Validator for Kafka privileges.
+ * Below are the requirements for a kafka privilege to be valid.
+ * 1. Privilege must start with Host resource.
+ * 2. Privilege must have at most one non Host resource, Cluster or Topic or ConsumerGroup, followed
+ *    by Host resource.
+ * 3. Privilege must end with exactly one action.
+ */
 public class KafkaPrivilegeValidator implements PrivilegeValidator {
 
+  public static final String KafkaPrivilegeHelpMsg =
+      "Invalid Kafka privilege." +
+      " Kafka privilege must be of the form host=<HOST>-><RESOURCE>=<RESOURCE_NAME>->action=<ACTION>," +
+      " where <HOST> can be '*' or any valid host name," +
+      " <RESOURCE> can be one of " + Arrays.toString(getKafkaAuthorizablesExceptHost()) +
+      " <RESOURCE_NAME> is name of the resource," +
+      " <ACTION> can be one of " + Arrays.toString(KafkaActionFactory.KafkaActionType.values()) +
+      ".";
+
+  private static KafkaAuthorizable.AuthorizableType[] getKafkaAuthorizablesExceptHost() {
+    final KafkaAuthorizable.AuthorizableType[] authorizableTypes = KafkaAuthorizable.AuthorizableType.values();
+    List<KafkaAuthorizable.AuthorizableType> authorizableTypesWithoutHost = new ArrayList<>(authorizableTypes.length - 1);
+    for (KafkaAuthorizable.AuthorizableType authorizableType: authorizableTypes) {
+      if (!authorizableType.equals(KafkaAuthorizable.AuthorizableType.HOST)) {
+        authorizableTypesWithoutHost.add(authorizableType);
+      }
+    }
+    return authorizableTypesWithoutHost.toArray(new KafkaAuthorizable.AuthorizableType[authorizableTypesWithoutHost.size()]);
+  }
+
   public KafkaPrivilegeValidator() {
   }
 
   @Override
-  public void validate(PrivilegeValidatorContext context)
-      throws ConfigurationException {
-    Iterable<KafkaAuthorizable> authorizables = parsePrivilege(context.getPrivilege());
-    boolean hostnameMatched = false;
-    for (KafkaAuthorizable authorizable : authorizables) {
+  public void validate(PrivilegeValidatorContext context) throws ConfigurationException {
+    List<String> splits = Lists.newArrayList();
+    for (String section : AUTHORIZABLE_SPLITTER.split(context.getPrivilege())) {
+      splits.add(section);
+    }
+
+    // Check privilege splits length is 2 or 3
+    if (splits.size() < 2 || splits.size() > 3) {
+      throw new ConfigurationException(KafkaPrivilegeHelpMsg);
+    }
+
+    // Check privilege starts with Host resource
+    if (isAction(splits.get(0))) {
+      throw new ConfigurationException("Kafka privilege can not start with an action.\n" + KafkaPrivilegeHelpMsg);
+    }
+    KafkaAuthorizable hostAuthorizable = KafkaModelAuthorizables.from(splits.get(0));
+    if (hostAuthorizable == null) {
+      throw new ConfigurationException("No Kafka authorizable found for " + splits.get(0) + "\n." + KafkaPrivilegeHelpMsg);
+    }
+    if (!(hostAuthorizable instanceof Host)) {
+      throw new ConfigurationException("Kafka privilege must begin with host authorizable.\n" + KafkaPrivilegeHelpMsg);
+    }
+
+    // Check privilege has at most one non Host resource following Host resource
+    if (splits.size() == 3) {
+      if (isAction(splits.get(1))) {
+        throw new ConfigurationException("Kafka privilege can have action only at the end of privilege.\n" + KafkaPrivilegeHelpMsg);
+      }
+      KafkaAuthorizable authorizable = KafkaModelAuthorizables.from(splits.get(1));
+      if (authorizable == null) {
+        throw new ConfigurationException("No Kafka authorizable found for " + splits.get(1) + "\n." + KafkaPrivilegeHelpMsg);
+      }
       if (authorizable instanceof Host) {
-        hostnameMatched = true;
-        break;
+        throw new ConfigurationException("Host authorizable can be specified just once in a Kafka privilege.\n" + KafkaPrivilegeHelpMsg);
       }
     }
-    if (!hostnameMatched) {
-      String msg = "host=[name] in " + context.getPrivilege() + " is required.";
-      throw new ConfigurationException(msg);
+
+    // Check privilege ends with exactly one valid action
+    if (!isAction(splits.get(splits.size() - 1))) {
+      throw new ConfigurationException("Kafka privilege must end with a valid action.\n" + KafkaPrivilegeHelpMsg);
     }
   }
 
-  private Iterable<KafkaAuthorizable> parsePrivilege(String string) {
-    List<KafkaAuthorizable> result = Lists.newArrayList();
-    for(String section : AUTHORIZABLE_SPLITTER.split(string)) {
-      if(!section.toLowerCase().startsWith(PRIVILEGE_PREFIX)) {
-        KafkaAuthorizable authorizable = KafkaModelAuthorizables.from(section);
-        if(authorizable == null) {
-          String msg = "No authorizable found for " + section;
-          throw new ConfigurationException(msg);
-        }
-        result.add(authorizable);
-      }
-    }
-    return result;
+  private boolean isAction(String privilegePart) {
+    final String privilege = privilegePart.toLowerCase();
+    final String action = privilege.replace(PRIVILEGE_PREFIX, "").toLowerCase();
+    return privilege.startsWith(PRIVILEGE_PREFIX) &&
+        KafkaActionFactory.getInstance().getActionByName(action) != null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
index e04aeb7..76aeb80 100644
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
@@ -21,6 +21,7 @@ import static org.apache.sentry.provider.common.ProviderConstants.AUTHORIZABLE_S
 import java.util.List;
 
 import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
 import org.apache.sentry.policy.common.Privilege;
 import org.apache.sentry.policy.common.PrivilegeFactory;
 import org.apache.sentry.provider.common.KeyValue;
@@ -32,6 +33,8 @@ import com.google.common.collect.Lists;
 
 public class KafkaWildcardPrivilege implements Privilege {
 
+  private static String ALL_HOSTS = "*";
+
   public static class Factory implements PrivilegeFactory {
     @Override
     public Privilege createPrivilege(String permission) {
@@ -107,17 +110,29 @@ public class KafkaWildcardPrivilege implements Privilege {
   private boolean impliesKeyValue(KeyValue policyPart, KeyValue requestPart) {
     Preconditions.checkState(policyPart.getKey().equalsIgnoreCase(requestPart.getKey()),
         "Please report, this method should not be called with two different keys");
-    if(policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL) ||
-        policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL_NAME) ||
-        policyPart.equals(requestPart)) {
+
+    // Host is a special resource, not declared as resource in Kafka. Each Kafka resource can be
+    // authorized based on the host request originated from and to handle this, Sentry uses host as
+    // a resource. Kafka allows using '*' as wildcard for all hosts. '*' however is not a valid
+    // Kafka action.
+    if (hasHostWidCard(policyPart)) {
       return true;
-    } else if (!KafkaActionConstant.actionName.equalsIgnoreCase(policyPart.getKey())
-        && KafkaActionConstant.ALL.equalsIgnoreCase(requestPart.getValue())) {
-      /* privilege request is to match with any object of given type */
+    }
+
+    if (KafkaActionConstant.actionName.equalsIgnoreCase(policyPart.getKey())) { // is action
+      return policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL) ||
+          policyPart.equals(requestPart);
+    } else {
+      return policyPart.getValue().equals(requestPart.getValue());
+    }
+  }
+
+  private boolean hasHostWidCard(KeyValue policyPart) {
+    if (policyPart.getKey().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.toString()) &&
+        policyPart.getValue().equalsIgnoreCase(ALL_HOSTS)) {
       return true;
     }
     return false;
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
index 46a0078..513c271 100644
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
@@ -21,7 +21,10 @@ package org.apache.sentry.policy.kafka;
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNull;
 
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.ConsumerGroup;
 import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.Topic;
 import org.junit.Test;
 
 public class TestKafkaModelAuthorizables {
@@ -51,4 +54,19 @@ public class TestKafkaModelAuthorizables {
   public void testNotAuthorizable() throws Exception {
     assertNull(KafkaModelAuthorizables.from("k=v"));
   }
+
+  @Test
+  public void testResourceNameIsCaseSensitive() throws Exception {
+    Host host1 = (Host)KafkaModelAuthorizables.from("HOST=Host1");
+    assertEquals("Host1", host1.getName());
+
+    Cluster cluster1 = (Cluster)KafkaModelAuthorizables.from("Cluster=cLuster1");
+    assertEquals("cLuster1", cluster1.getName());
+
+    Topic topic1 = (Topic)KafkaModelAuthorizables.from("topic=topiC1");
+    assertEquals("topiC1", topic1.getName());
+
+    ConsumerGroup consumergroup1 = (ConsumerGroup)KafkaModelAuthorizables.from("ConsumerGroup=CG1");
+    assertEquals("CG1", consumergroup1.getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
index ba670f7..9e58895 100644
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
@@ -31,27 +31,24 @@ public class TestKafkaPrivilegeValidator {
     try {
       kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1"));
     } catch (ConfigurationException ex) {
-      Assert.fail("Unexpected ConfigurationException.");
+      Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
     }
   }
 
   @Test
   public void testWithoutHostResource() throws Exception {
     KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    testHostResourceIsChecked(kafkaPrivilegeValidator, "cluster=c1->action=read");
+    testHostResourceIsChecked(kafkaPrivilegeValidator, "topic=t1->action=read");
+    testHostResourceIsChecked(kafkaPrivilegeValidator, "consumergroup=g1->action=read");
+  }
+
+  private void testHostResourceIsChecked(KafkaPrivilegeValidator kafkaPrivilegeValidator, String privilege) {
     try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("cluster=c1->action=read"));
-      Assert.fail("Expected ConfigurationException");
-    } catch (ConfigurationException ex) {
-    }
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("topic=t1->action=read"));
-      Assert.fail("Expected ConfigurationException");
-    } catch (ConfigurationException ex) {
-    }
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("consumergroup=g1->action=read"));
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext(privilege));
       Assert.fail("Expected ConfigurationException");
     } catch (ConfigurationException ex) {
+      Assert.assertEquals("Kafka privilege must begin with host authorizable.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
     }
   }
 
@@ -115,4 +112,58 @@ public class TestKafkaPrivilegeValidator {
     }
   }
 
+  @Test
+  public void testPrivilegeMustHaveExcatlyOneHost() {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->host=host2->action=read"));
+      Assert.fail("Multiple Host resources are not allowed within a Kafka privilege.");
+    } catch (ConfigurationException ex) {
+      Assert.assertEquals("Host authorizable can be specified just once in a Kafka privilege.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testPrivilegeCanNotStartWithAction() {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("action=write->host=host1->topic=t1"));
+      Assert.fail("Kafka privilege can not start with an action.");
+    } catch (ConfigurationException ex) {
+      Assert.assertEquals("Kafka privilege can not start with an action.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testPrivilegeWithMoreParts() {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->consumergroup=cg1->action=read"));
+      Assert.fail("Kafka privilege can have one Host authorizable, at most one non Host authorizable and one action.");
+    } catch (ConfigurationException ex) {
+      Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testPrivilegeNotEndingWithAction() {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->consumergroup=cg1"));
+      Assert.fail("Kafka privilege must end with a valid action.");
+    } catch (ConfigurationException ex) {
+      Assert.assertEquals("Kafka privilege must end with a valid action.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
+    }
+  }
+
+  @Test
+  public void testPrivilegeNotEndingWithValidAction() {
+    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
+    try {
+      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->action=bla"));
+      Assert.fail("Kafka privilege must end with a valid action.");
+    } catch (ConfigurationException ex) {
+      Assert.assertEquals("Kafka privilege must end with a valid action.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
index 720c98f..8566984 100644
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
@@ -59,6 +59,10 @@ public class TestKafkaWildcardPrivilege {
       create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.WRITE));
 
 
+  private static final Privilege KAFKA_CLUSTER1_HOST1_ALL =
+      create(new KeyValue("CLUSTER", "cluster1"), new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.ALL));
+
+
   @Test
   public void testSimpleAction() throws Exception {
     //host

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
index 4da506b..810c05e 100644
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
@@ -35,8 +35,8 @@ import com.google.common.io.Files;
 
 public abstract class AbstractTestKafkaPolicyEngine {
 
-  private static final String ADMIN = "host=*";
-  private static final String ADMIN_HOST1 = "host=host1";
+  private static final String ADMIN = "host=*->action=all";
+  private static final String ADMIN_HOST1 = "host=host1->action=all";
   private static final String CONSUMER_T1_ALL = "host=*->topic=t1->action=read";
   private static final String CONSUMER_T1_HOST1 = "host=host1->topic=t1->action=read";
   private static final String CONSUMER_T2_HOST2 = "host=host2->topic=t2->action=read";

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
index 0186cc9..1cb694a 100644
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
+++ b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
@@ -86,7 +86,7 @@ public class TestKafkaPolicyNegative {
     append("[groups]", globalPolicyFile);
     append("group = malicious_role", globalPolicyFile);
     append("[roles]", globalPolicyFile);
-    append("malicious_role = host=*", globalPolicyFile);
+    append("malicious_role = host=*->action=read", globalPolicyFile);
     PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath());
     ImmutableSet<String> permissions = policy.getAllPrivileges(Sets.newHashSet("group"), ActiveRoleSet.ALL);
     Assert.assertTrue(permissions.toString(), permissions.size() == 1);

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/184a32d6/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini b/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
index c533e69..1951aba 100644
--- a/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
+++ b/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
@@ -27,8 +27,8 @@ producer_group2 = producer_t2_host2
 consumer_producer_group0 = consumer_producer_t1
 
 [roles]
-admin_all = host=*
-admin_host1 = host=host1
+admin_all = host=*->action=all
+admin_host1 = host=host1->action=all
 consumer_t1_all = host=*->topic=t1->action=read
 consumer_t1_host1 = host=host1->topic=t1->action=read
 consumer_t2_host2 = host=host2->topic=t2->action=read