You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by co...@apache.org on 2016/04/22 08:28:30 UTC

[10/13] sentry git commit: SENTRY-999: Refactor the sentry to integrate with external components quickly (Colin Ma, reviewed by Dapeng Sun)

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index c6600a0..15f7359 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -42,16 +42,19 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.sentry.SentryUserException;
 import org.apache.sentry.core.common.ActiveRoleSet;
 import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.Model;
 import org.apache.sentry.core.common.Subject;
 import org.apache.sentry.core.model.kafka.KafkaActionFactory;
 import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
 import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
 import org.apache.sentry.kafka.ConvertUtil;
 import org.apache.sentry.kafka.conf.KafkaAuthConf.AuthzConfVars;
 import org.apache.sentry.policy.common.PolicyEngine;
 import org.apache.sentry.provider.common.AuthorizationComponent;
 import org.apache.sentry.provider.common.AuthorizationProvider;
 import org.apache.sentry.provider.common.ProviderBackend;
+import org.apache.sentry.provider.common.ProviderBackendContext;
 import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
 import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
 import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
@@ -72,491 +75,497 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 
 public class KafkaAuthBinding {
 
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class);
-    private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA;
-    private static final String COMPONENT_NAME = COMPONENT_TYPE;
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class);
+  private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA;
+  private static final String COMPONENT_NAME = COMPONENT_TYPE;
 
-    private static Boolean kerberosInit;
+  private static Boolean kerberosInit;
 
-    private final Configuration authConf;
-    private final AuthorizationProvider authProvider;
-    private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
+  private final Configuration authConf;
+  private final AuthorizationProvider authProvider;
+  private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
 
-    private ProviderBackend providerBackend;
-    private String instanceName;
-    private String requestorName;
-    private java.util.Map<String, ?> kafkaConfigs;
+  private ProviderBackend providerBackend;
+  private String instanceName;
+  private String requestorName;
+  private java.util.Map<String, ?> kafkaConfigs;
 
 
-    public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception {
-        this.instanceName = instanceName;
-        this.requestorName = requestorName;
-        this.authConf = authConf;
-        this.kafkaConfigs = kafkaConfigs;
-        this.authProvider = createAuthProvider();
-    }
+  public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception {
+    this.instanceName = instanceName;
+    this.requestorName = requestorName;
+    this.authConf = authConf;
+    this.kafkaConfigs = kafkaConfigs;
+    this.authProvider = createAuthProvider();
+  }
 
+  /**
+   * Instantiate the configured authz provider
+   *
+   * @return {@link AuthorizationProvider}
+   */
+  private AuthorizationProvider createAuthProvider() throws Exception {
     /**
-     * Instantiate the configured authz provider
-     *
-     * @return {@link AuthorizationProvider}
+     * get the authProvider class, policyEngine class, providerBackend class and resources from the
+     * kafkaAuthConf config
      */
-    private AuthorizationProvider createAuthProvider() throws Exception {
-        /**
-         * get the authProvider class, policyEngine class, providerBackend class and resources from the
-         * kafkaAuthConf config
-         */
-        String authProviderName =
-            authConf.get(AuthzConfVars.AUTHZ_PROVIDER.getVar(),
-                AuthzConfVars.AUTHZ_PROVIDER.getDefault());
-        String resourceName =
-            authConf.get(AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(),
-                AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getDefault());
-        String providerBackendName =
-            authConf.get(AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(),
-                AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getDefault());
-        String policyEngineName =
-            authConf.get(AuthzConfVars.AUTHZ_POLICY_ENGINE.getVar(),
-                AuthzConfVars.AUTHZ_POLICY_ENGINE.getDefault());
-        if (resourceName != null && resourceName.startsWith("classpath:")) {
-            String resourceFileName = resourceName.substring("classpath:".length());
-            resourceName = AuthorizationProvider.class.getClassLoader().getResource(resourceFileName).getPath();
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Using authorization provider " + authProviderName + " with resource "
-                + resourceName + ", policy engine " + policyEngineName + ", provider backend "
-                + providerBackendName);
-        }
-
-        // Initiate kerberos via UserGroupInformation if required
-        if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE))
-                && kafkaConfigs != null) {
-            String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString();
-            String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString();
-            if (keytabProp != null && principalProp != null) {
-                String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString();
-                if (actualHost != null) {
-                    principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost);
-                }
-                initKerberos(keytabProp, principalProp);
-            } else {
-                LOG.debug("Could not initialize Kerberos.\n" +
-                        AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" +
-                        AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString());
-            }
-        } else {
-            LOG.debug("Could not initialize Kerberos as no kafka config provided. " +
-                    AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() +
-                    " are required configs to be able to initialize Kerberos");
-        }
-
-        // Instantiate the configured providerBackend
-        Constructor<?> providerBackendConstructor =
-            Class.forName(providerBackendName)
-                .getDeclaredConstructor(Configuration.class, String.class);
-        providerBackendConstructor.setAccessible(true);
-        providerBackend =
-            (ProviderBackend) providerBackendConstructor.newInstance(new Object[]{authConf,
-                resourceName});
-        if (providerBackend instanceof SentryGenericProviderBackend) {
-            ((SentryGenericProviderBackend) providerBackend).setComponentType(COMPONENT_TYPE);
-            ((SentryGenericProviderBackend) providerBackend).setServiceName(instanceName);
-        }
-
-        // Instantiate the configured policyEngine
-        Constructor<?> policyConstructor =
-            Class.forName(policyEngineName).getDeclaredConstructor(ProviderBackend.class);
-        policyConstructor.setAccessible(true);
-        PolicyEngine policyEngine =
-            (PolicyEngine) policyConstructor.newInstance(new Object[]{providerBackend});
-
-        // Instantiate the configured authProvider
-        Constructor<?> constructor =
-            Class.forName(authProviderName).getDeclaredConstructor(Configuration.class, String.class,
-                PolicyEngine.class);
-        constructor.setAccessible(true);
-        return (AuthorizationProvider) constructor.newInstance(new Object[]{authConf, resourceName,
-            policyEngine});
+    String authProviderName =
+        authConf.get(AuthzConfVars.AUTHZ_PROVIDER.getVar(),
+            AuthzConfVars.AUTHZ_PROVIDER.getDefault());
+    String resourceName =
+        authConf.get(AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(),
+            AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getDefault());
+    String providerBackendName =
+        authConf.get(AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(),
+            AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getDefault());
+    String policyEngineName =
+        authConf.get(AuthzConfVars.AUTHZ_POLICY_ENGINE.getVar(),
+            AuthzConfVars.AUTHZ_POLICY_ENGINE.getDefault());
+    if (resourceName != null && resourceName.startsWith("classpath:")) {
+      String resourceFileName = resourceName.substring("classpath:".length());
+      resourceName = AuthorizationProvider.class.getClassLoader().getResource(resourceFileName).getPath();
     }
-
-    /**
-     * Authorize access to a Kafka privilege
-     */
-    public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) {
-        List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(session.clientAddress().getHostAddress(), resource);
-        Set<KafkaAction> actions = Sets.newHashSet(actionFactory.getActionByName(operation.name()));
-        return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using authorization provider " + authProviderName + " with resource "
+          + resourceName + ", policy engine " + policyEngineName + ", provider backend "
+          + providerBackendName);
     }
 
-    public void addAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
-        verifyAcls(acls);
-        LOG.info("Adding Acl: acl->" + acls + " resource->" + resource);
-
-        final Iterator<Acl> iterator = acls.iterator();
-        while (iterator.hasNext()) {
-            final Acl acl = iterator.next();
-            final String role = getRole(acl);
-            if (!roleExists(role)) {
-                throw new KafkaException("Can not add Acl for non-existent Role: " + role);
-            }
-            execute(new Command<Void>() {
-                @Override
-                public Void run(SentryGenericServiceClient client) throws Exception {
-                    client.grantPrivilege(
-                        requestorName, role, COMPONENT_NAME, toTSentryPrivilege(acl, resource));
-                    return null;
-                }
-            });
-        }
+    // Initiate kerberos via UserGroupInformation if required
+    if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE))
+            && kafkaConfigs != null) {
+      String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString();
+      String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString();
+      if (keytabProp != null && principalProp != null) {
+          String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString();
+          if (actualHost != null) {
+              principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost);
+          }
+          initKerberos(keytabProp, principalProp);
+      } else {
+          LOG.debug("Could not initialize Kerberos.\n" +
+                  AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" +
+                  AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString());
+      }
+    } else {
+      LOG.debug("Could not initialize Kerberos as no kafka config provided. " +
+              AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() +
+              " are required configs to be able to initialize Kerberos");
     }
 
-    public boolean removeAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
-        verifyAcls(acls);
-        LOG.info("Removing Acl: acl->" + acls + " resource->" + resource);
-        final Iterator<Acl> iterator = acls.iterator();
-        while (iterator.hasNext()) {
-            final Acl acl = iterator.next();
-            final String role = getRole(acl);
-            try {
-                execute(new Command<Void>() {
-                    @Override
-                    public Void run(SentryGenericServiceClient client) throws Exception {
-                        client.dropPrivilege(
-                                requestorName, role, toTSentryPrivilege(acl, resource));
-                        return null;
-                    }
-                });
-            } catch (KafkaException kex) {
-                LOG.error("Failed to remove acls.", kex);
-                return false;
-            }
-        }
-
-        return true;
+    // Instantiate the configured providerBackend
+    Constructor<?> providerBackendConstructor =
+        Class.forName(providerBackendName)
+            .getDeclaredConstructor(Configuration.class, String.class);
+    providerBackendConstructor.setAccessible(true);
+    providerBackend =
+        (ProviderBackend) providerBackendConstructor.newInstance(new Object[]{authConf,
+            resourceName});
+    if (providerBackend instanceof SentryGenericProviderBackend) {
+      ((SentryGenericProviderBackend) providerBackend).setComponentType(COMPONENT_TYPE);
+      ((SentryGenericProviderBackend) providerBackend).setServiceName(instanceName);
     }
 
-    public void addRole(final String role) {
-        if (roleExists(role)) {
-            throw new KafkaException("Can not create an existing role, " + role + ", again.");
+    // Create backend context
+    ProviderBackendContext context = new ProviderBackendContext();
+    context.setAllowPerDatabase(false);
+    context.setValidators(KafkaPrivilegeModel.getInstance().getPrivilegeValidators());
+    providerBackend.initialize(context);
+
+    // Instantiate the configured policyEngine
+    Constructor<?> policyConstructor =
+        Class.forName(policyEngineName).getDeclaredConstructor(ProviderBackend.class);
+    policyConstructor.setAccessible(true);
+    PolicyEngine policyEngine =
+        (PolicyEngine) policyConstructor.newInstance(new Object[]{providerBackend});
+
+    // Instantiate the configured authProvider
+    Constructor<?> constructor =
+        Class.forName(authProviderName).getDeclaredConstructor(Configuration.class, String.class,
+            PolicyEngine.class, Model.class);
+    constructor.setAccessible(true);
+    return (AuthorizationProvider) constructor.newInstance(new Object[]{authConf, resourceName,
+        policyEngine, KafkaPrivilegeModel.getInstance()});
+  }
+
+  /**
+   * Authorize access to a Kafka privilege
+   */
+  public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) {
+      List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(session.clientAddress().getHostAddress(), resource);
+      Set<KafkaAction> actions = Sets.newHashSet(actionFactory.getActionByName(operation.name()));
+      return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL);
+  }
+
+  public void addAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
+    verifyAcls(acls);
+    LOG.info("Adding Acl: acl->" + acls + " resource->" + resource);
+
+    final Iterator<Acl> iterator = acls.iterator();
+    while (iterator.hasNext()) {
+      final Acl acl = iterator.next();
+      final String role = getRole(acl);
+      if (!roleExists(role)) {
+        throw new KafkaException("Can not add Acl for non-existent Role: " + role);
+      }
+      execute(new Command<Void>() {
+        @Override
+        public Void run(SentryGenericServiceClient client) throws Exception {
+          client.grantPrivilege(
+              requestorName, role, COMPONENT_NAME, toTSentryPrivilege(acl, resource));
+          return null;
         }
-
-        execute(new Command<Void>() {
-            @Override
-            public Void run(SentryGenericServiceClient client) throws Exception {
-                client.createRole(
-                    requestorName, role, COMPONENT_NAME);
-                return null;
-            }
-        });
-    }
-
-    public void addRoleToGroups(final String role, final java.util.Set<String> groups) {
-        execute(new Command<Void>() {
-            @Override
-            public Void run(SentryGenericServiceClient client) throws Exception {
-                client.addRoleToGroups(
-                    requestorName, role, COMPONENT_NAME, groups);
-                return null;
-            }
-        });
+      });
     }
-
-    public void dropAllRoles() {
-        final List<String> roles = getAllRoles();
+  }
+
+  public boolean removeAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
+    verifyAcls(acls);
+    LOG.info("Removing Acl: acl->" + acls + " resource->" + resource);
+    final Iterator<Acl> iterator = acls.iterator();
+    while (iterator.hasNext()) {
+      final Acl acl = iterator.next();
+      final String role = getRole(acl);
+      try {
         execute(new Command<Void>() {
-            @Override
-            public Void run(SentryGenericServiceClient client) throws Exception {
-                for (String role : roles) {
-                    client.dropRole(requestorName, role, COMPONENT_NAME);
-                }
-                return null;
-            }
+          @Override
+          public Void run(SentryGenericServiceClient client) throws Exception {
+            client.dropPrivilege(
+                    requestorName, role, toTSentryPrivilege(acl, resource));
+            return null;
+          }
         });
+      } catch (KafkaException kex) {
+        LOG.error("Failed to remove acls.", kex);
+        return false;
+      }
     }
 
-    private List<String> getRolesforGroup(final String groupName) {
-        final List<String> roles = new ArrayList<>();
-        execute(new Command<Void>() {
-            @Override
-            public Void run(SentryGenericServiceClient client) throws Exception {
-                for (TSentryRole tSentryRole : client.listRolesByGroupName(requestorName, groupName, COMPONENT_NAME)) {
-                    roles.add(tSentryRole.getRoleName());
-                }
-                return null;
-            }
-        });
-
-        return roles;
-    }
+    return true;
+  }
 
-    private SentryGenericServiceClient getClient() throws Exception {
-        return SentryGenericServiceClientFactory.create(this.authConf);
+  public void addRole(final String role) {
+    if (roleExists(role)) {
+      throw new KafkaException("Can not create an existing role, " + role + ", again.");
     }
 
-    public boolean removeAcls(final Resource resource) {
-        LOG.info("Removing Acls for Resource: resource->" + resource);
-        List<String> roles = getAllRoles();
-        final List<TSentryPrivilege> tSentryPrivileges = getAllPrivileges(roles);
-        try {
-            execute(new Command<Void>() {
-                @Override
-                public Void run(SentryGenericServiceClient client) throws Exception {
-                    for (TSentryPrivilege tSentryPrivilege : tSentryPrivileges) {
-                        if (isPrivilegeForResource(tSentryPrivilege, resource)) {
-                            client.dropPrivilege(
-                                    requestorName, COMPONENT_NAME, tSentryPrivilege);
-                        }
-                    }
-                    return null;
-                }
-            });
-        } catch (KafkaException kex) {
-            LOG.error("Failed to remove acls.", kex);
-            return false;
+    execute(new Command<Void>() {
+      @Override
+      public Void run(SentryGenericServiceClient client) throws Exception {
+        client.createRole(
+            requestorName, role, COMPONENT_NAME);
+        return null;
+      }
+    });
+  }
+
+  public void addRoleToGroups(final String role, final java.util.Set<String> groups) {
+    execute(new Command<Void>() {
+      @Override
+      public Void run(SentryGenericServiceClient client) throws Exception {
+        client.addRoleToGroups(
+            requestorName, role, COMPONENT_NAME, groups);
+        return null;
+      }
+    });
+  }
+
+  public void dropAllRoles() {
+    final List<String> roles = getAllRoles();
+    execute(new Command<Void>() {
+      @Override
+      public Void run(SentryGenericServiceClient client) throws Exception {
+        for (String role : roles) {
+          client.dropRole(requestorName, role, COMPONENT_NAME);
         }
-
-        return true;
-    }
-
-    public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) {
-        final Option<scala.collection.immutable.Set<Acl>> acls = getAcls().get(resource);
-        if (acls.nonEmpty())
-            return acls.get();
-        return new scala.collection.immutable.HashSet<Acl>();
-    }
-
-    public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(KafkaPrincipal principal) {
-        if (principal.getPrincipalType().toLowerCase().equals("group")) {
-            List<String> roles = getRolesforGroup(principal.getName());
-            return getAclsForRoles(roles);
-        } else {
-            LOG.info("Did not recognize Principal type: " + principal.getPrincipalType() + ". Returning Acls for all principals.");
-            return getAcls();
+        return null;
+      }
+    });
+  }
+
+  private List<String> getRolesforGroup(final String groupName) {
+    final List<String> roles = new ArrayList<>();
+    execute(new Command<Void>() {
+      @Override
+      public Void run(SentryGenericServiceClient client) throws Exception {
+        for (TSentryRole tSentryRole : client.listRolesByGroupName(requestorName, groupName, COMPONENT_NAME)) {
+          roles.add(tSentryRole.getRoleName());
         }
-    }
-
-    public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() {
-        final List<String> roles = getAllRoles();
-        return getAclsForRoles(roles);
-    }
-
-    /**
-     * A Command is a closure used to pass a block of code from individual
-     * functions to execute, which centralizes connection error
-     * handling. Command is parameterized on the return type of the function.
-     */
-    private interface Command<T> {
-        T run(SentryGenericServiceClient client) throws Exception;
-    }
-
-    private <T> T execute(Command<T> cmd) throws KafkaException {
-        SentryGenericServiceClient client = null;
-        try {
-            client = getClient();
-            return cmd.run(client);
-        } catch (SentryUserException ex) {
-            String msg = "Unable to excute command on sentry server: " + ex.getMessage();
-            LOG.error(msg, ex);
-            throw new KafkaException(msg, ex);
-        } catch (Exception ex) {
-            String msg = "Unable to obtain client:" + ex.getMessage();
-            LOG.error(msg, ex);
-            throw new KafkaException(msg, ex);
-        } finally {
-            if (client != null) {
-                client.close();
+        return null;
+      }
+    });
+
+    return roles;
+  }
+
+  private SentryGenericServiceClient getClient() throws Exception {
+    return SentryGenericServiceClientFactory.create(this.authConf);
+  }
+
+  public boolean removeAcls(final Resource resource) {
+    LOG.info("Removing Acls for Resource: resource->" + resource);
+    List<String> roles = getAllRoles();
+    final List<TSentryPrivilege> tSentryPrivileges = getAllPrivileges(roles);
+    try {
+      execute(new Command<Void>() {
+        @Override
+        public Void run(SentryGenericServiceClient client) throws Exception {
+          for (TSentryPrivilege tSentryPrivilege : tSentryPrivileges) {
+            if (isPrivilegeForResource(tSentryPrivilege, resource)) {
+              client.dropPrivilege(
+                        requestorName, COMPONENT_NAME, tSentryPrivilege);
             }
+          }
+          return null;
         }
+      });
+    } catch (KafkaException kex) {
+      LOG.error("Failed to remove acls.", kex);
+      return false;
     }
 
-    private TSentryPrivilege toTSentryPrivilege(Acl acl, Resource resource) {
-        final List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(acl.host(), resource);
-        final List<TAuthorizable> tAuthorizables = new ArrayList<>();
-        for (Authorizable authorizable : authorizables) {
-            tAuthorizables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName()));
-        }
-        TSentryPrivilege tSentryPrivilege = new TSentryPrivilege(COMPONENT_NAME, instanceName, tAuthorizables, acl.operation().name());
-        return tSentryPrivilege;
-    }
-
-    private String getRole(Acl acl) {
-        return acl.principal().getName();
-    }
-
-    private boolean isPrivilegeForResource(TSentryPrivilege tSentryPrivilege, Resource resource) {
-        final java.util.Iterator<TAuthorizable> authorizablesIterator = tSentryPrivilege.getAuthorizablesIterator();
-        while (authorizablesIterator.hasNext()) {
-            TAuthorizable tAuthorizable = authorizablesIterator.next();
-            if (tAuthorizable.getType().equals(resource.resourceType().name())) {
-                return true;
-            }
-        }
-        return false;
+    return true;
+  }
+
+  public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) {
+    final Option<scala.collection.immutable.Set<Acl>> acls = getAcls().get(resource);
+    if (acls.nonEmpty())
+      return acls.get();
+    return new scala.collection.immutable.HashSet<Acl>();
+  }
+
+  public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(KafkaPrincipal principal) {
+    if (principal.getPrincipalType().toLowerCase().equals("group")) {
+      List<String> roles = getRolesforGroup(principal.getName());
+      return getAclsForRoles(roles);
+    } else {
+      LOG.info("Did not recognize Principal type: " + principal.getPrincipalType() + ". Returning Acls for all principals.");
+      return getAcls();
     }
-
-    private List<TSentryPrivilege> getAllPrivileges(final List<String> roles) {
-        final List<TSentryPrivilege> tSentryPrivileges = new ArrayList<>();
-        execute(new Command<Void>() {
-            @Override
-            public Void run(SentryGenericServiceClient client) throws Exception {
-                for (String role : roles) {
-                    tSentryPrivileges.addAll(client.listPrivilegesByRoleName(
-                        requestorName, role, COMPONENT_NAME, instanceName));
-                }
-                return null;
-            }
-        });
-
-        return tSentryPrivileges;
+  }
+
+  public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() {
+    final List<String> roles = getAllRoles();
+    return getAclsForRoles(roles);
+  }
+
+  /**
+   * A Command is a closure used to pass a block of code from individual
+   * functions to execute, which centralizes connection error
+   * handling. Command is parameterized on the return type of the function.
+   */
+  private interface Command<T> {
+    T run(SentryGenericServiceClient client) throws Exception;
+  }
+
+  private <T> T execute(Command<T> cmd) throws KafkaException {
+    SentryGenericServiceClient client = null;
+    try {
+      client = getClient();
+      return cmd.run(client);
+    } catch (SentryUserException ex) {
+      String msg = "Unable to excute command on sentry server: " + ex.getMessage();
+      LOG.error(msg, ex);
+      throw new KafkaException(msg, ex);
+    } catch (Exception ex) {
+      String msg = "Unable to obtain client:" + ex.getMessage();
+      LOG.error(msg, ex);
+      throw new KafkaException(msg, ex);
+    } finally {
+      if (client != null) {
+        client.close();
+      }
     }
+  }
 
-    private List<String> getAllRoles() {
-        final List<String> roles = new ArrayList<>();
-        execute(new Command<Void>() {
-            @Override
-            public Void run(SentryGenericServiceClient client) throws Exception {
-                for (TSentryRole tSentryRole : client.listAllRoles(requestorName, COMPONENT_NAME)) {
-                    roles.add(tSentryRole.getRoleName());
-                }
-                return null;
-            }
-        });
-
-        return roles;
+  private TSentryPrivilege toTSentryPrivilege(Acl acl, Resource resource) {
+    final List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(acl.host(), resource);
+    final List<TAuthorizable> tAuthorizables = new ArrayList<>();
+    for (Authorizable authorizable : authorizables) {
+      tAuthorizables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName()));
     }
-
-    private Map<Resource, scala.collection.immutable.Set<Acl>> getAclsForRoles(final List<String> roles) {
-        return scala.collection.JavaConverters.mapAsScalaMapConverter(
-                rolePrivilegesToResourceAcls(getRoleToPrivileges(roles)))
-                .asScala().toMap(Predef.<Tuple2<Resource, scala.collection.immutable.Set<Acl>>>conforms());
+    TSentryPrivilege tSentryPrivilege = new TSentryPrivilege(COMPONENT_NAME, instanceName, tAuthorizables, acl.operation().name());
+    return tSentryPrivilege;
+  }
+
+  private String getRole(Acl acl) {
+    return acl.principal().getName();
+  }
+
+  private boolean isPrivilegeForResource(TSentryPrivilege tSentryPrivilege, Resource resource) {
+    final java.util.Iterator<TAuthorizable> authorizablesIterator = tSentryPrivilege.getAuthorizablesIterator();
+    while (authorizablesIterator.hasNext()) {
+      TAuthorizable tAuthorizable = authorizablesIterator.next();
+      if (tAuthorizable.getType().equals(resource.resourceType().name())) {
+        return true;
+      }
     }
-
-    private java.util.Map<Resource, scala.collection.immutable.Set<Acl>> rolePrivilegesToResourceAcls(java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap) {
-        final java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap = new HashMap<>();
-        for (String role : rolePrivilegesMap.keySet()) {
-            scala.collection.immutable.Set<TSentryPrivilege> privileges = rolePrivilegesMap.get(role);
-            final Iterator<TSentryPrivilege> iterator = privileges.iterator();
-            while (iterator.hasNext()) {
-                TSentryPrivilege privilege = iterator.next();
-                final List<TAuthorizable> authorizables = privilege.getAuthorizables();
-                String host = null;
-                String operation = privilege.getAction();
-                for (TAuthorizable tAuthorizable : authorizables) {
-                    if (tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name())) {
-                        host = tAuthorizable.getName();
-                    } else {
-                        Resource resource = new Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()), tAuthorizable.getName());
-                        if (operation.equals("*")) {
-                            operation = "All";
-                        }
-                        Acl acl = new Acl(new KafkaPrincipal("role", role), Allow$.MODULE$, host, Operation$.MODULE$.fromString(operation));
-                        Set<Acl> newAclsJava = new HashSet<Acl>();
-                        newAclsJava.add(acl);
-                        addExistingAclsForResource(resourceAclsMap, resource, newAclsJava);
-                        final scala.collection.mutable.Set<Acl> aclScala = JavaConversions.asScalaSet(newAclsJava);
-                        resourceAclsMap.put(resource, aclScala.<Acl>toSet());
-                    }
-                }
-            }
+    return false;
+  }
+
+  private List<TSentryPrivilege> getAllPrivileges(final List<String> roles) {
+    final List<TSentryPrivilege> tSentryPrivileges = new ArrayList<>();
+    execute(new Command<Void>() {
+      @Override
+      public Void run(SentryGenericServiceClient client) throws Exception {
+        for (String role : roles) {
+          tSentryPrivileges.addAll(client.listPrivilegesByRoleName(
+                requestorName, role, COMPONENT_NAME, instanceName));
         }
-
-        return resourceAclsMap;
-    }
-
-    private java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> getRoleToPrivileges(final List<String> roles) {
-        final java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap = new HashMap<>();
-        execute(new Command<Void>() {
-            @Override
-            public Void run(SentryGenericServiceClient client) throws Exception {
-                for (String role : roles) {
-                    final Set<TSentryPrivilege> rolePrivileges = client.listPrivilegesByRoleName(
-                        requestorName, role, COMPONENT_NAME, instanceName);
-                    final scala.collection.immutable.Set<TSentryPrivilege> rolePrivilegesScala =
-                        scala.collection.JavaConverters.asScalaSetConverter(rolePrivileges).asScala().toSet();
-                    rolePrivilegesMap.put(role, rolePrivilegesScala);
-                }
-                return null;
+        return null;
+      }
+    });
+
+    return tSentryPrivileges;
+  }
+
+  private List<String> getAllRoles() {
+    final List<String> roles = new ArrayList<>();
+    execute(new Command<Void>() {
+      @Override
+      public Void run(SentryGenericServiceClient client) throws Exception {
+        for (TSentryRole tSentryRole : client.listAllRoles(requestorName, COMPONENT_NAME)) {
+          roles.add(tSentryRole.getRoleName());
+        }
+        return null;
+      }
+    });
+
+    return roles;
+  }
+
+  private Map<Resource, scala.collection.immutable.Set<Acl>> getAclsForRoles(final List<String> roles) {
+    return scala.collection.JavaConverters.mapAsScalaMapConverter(
+              rolePrivilegesToResourceAcls(getRoleToPrivileges(roles)))
+              .asScala().toMap(Predef.<Tuple2<Resource, scala.collection.immutable.Set<Acl>>>conforms());
+  }
+
+  private java.util.Map<Resource, scala.collection.immutable.Set<Acl>> rolePrivilegesToResourceAcls(java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap) {
+    final java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap = new HashMap<>();
+    for (String role : rolePrivilegesMap.keySet()) {
+      scala.collection.immutable.Set<TSentryPrivilege> privileges = rolePrivilegesMap.get(role);
+      final Iterator<TSentryPrivilege> iterator = privileges.iterator();
+      while (iterator.hasNext()) {
+        TSentryPrivilege privilege = iterator.next();
+        final List<TAuthorizable> authorizables = privilege.getAuthorizables();
+        String host = null;
+        String operation = privilege.getAction();
+        for (TAuthorizable tAuthorizable : authorizables) {
+          if (tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name())) {
+            host = tAuthorizable.getName();
+          } else {
+            Resource resource = new Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()), tAuthorizable.getName());
+            if (operation.equals("*")) {
+              operation = "All";
             }
-        });
-
-        return rolePrivilegesMap;
+            Acl acl = new Acl(new KafkaPrincipal("role", role), Allow$.MODULE$, host, Operation$.MODULE$.fromString(operation));
+            Set<Acl> newAclsJava = new HashSet<Acl>();
+            newAclsJava.add(acl);
+            addExistingAclsForResource(resourceAclsMap, resource, newAclsJava);
+            final scala.collection.mutable.Set<Acl> aclScala = JavaConversions.asScalaSet(newAclsJava);
+            resourceAclsMap.put(resource, aclScala.<Acl>toSet());
+          }
+        }
+      }
     }
 
-    private void addExistingAclsForResource(java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap, Resource resource, java.util.Set<Acl> newAclsJava) {
-        final scala.collection.immutable.Set<Acl> existingAcls = resourceAclsMap.get(resource);
-        if (existingAcls != null) {
-            final Iterator<Acl> aclsIter = existingAcls.iterator();
-            while (aclsIter.hasNext()) {
-                Acl curAcl = aclsIter.next();
-                newAclsJava.add(curAcl);
-            }
+    return resourceAclsMap;
+  }
+
+  private java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> getRoleToPrivileges(final List<String> roles) {
+    final java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap = new HashMap<>();
+    execute(new Command<Void>() {
+      @Override
+      public Void run(SentryGenericServiceClient client) throws Exception {
+        for (String role : roles) {
+          final Set<TSentryPrivilege> rolePrivileges = client.listPrivilegesByRoleName(
+              requestorName, role, COMPONENT_NAME, instanceName);
+          final scala.collection.immutable.Set<TSentryPrivilege> rolePrivilegesScala =
+              scala.collection.JavaConverters.asScalaSetConverter(rolePrivileges).asScala().toSet();
+          rolePrivilegesMap.put(role, rolePrivilegesScala);
         }
+        return null;
+      }
+    });
+
+    return rolePrivilegesMap;
+  }
+
+  private void addExistingAclsForResource(java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap, Resource resource, java.util.Set<Acl> newAclsJava) {
+    final scala.collection.immutable.Set<Acl> existingAcls = resourceAclsMap.get(resource);
+    if (existingAcls != null) {
+      final Iterator<Acl> aclsIter = existingAcls.iterator();
+      while (aclsIter.hasNext()) {
+        Acl curAcl = aclsIter.next();
+        newAclsJava.add(curAcl);
+      }
     }
-
-    private boolean roleExists(String role) {
-        return getAllRoles().contains(role);
+  }
+
+  private boolean roleExists(String role) {
+      return getAllRoles().contains(role);
+  }
+
+  private void verifyAcls(scala.collection.immutable.Set<Acl> acls) {
+    final Iterator<Acl> iterator = acls.iterator();
+    while (iterator.hasNext()) {
+      final Acl acl = iterator.next();
+      assert acl.principal().getPrincipalType().toLowerCase().equals("role") : "Only Acls with KafkaPrincipal of type \"role;\" is supported.";
+      assert acl.permissionType().name().equals(Allow.name()) : "Only Acls with Permission of type \"Allow\" is supported.";
     }
-
-    private void verifyAcls(scala.collection.immutable.Set<Acl> acls) {
-        final Iterator<Acl> iterator = acls.iterator();
-        while (iterator.hasNext()) {
-            final Acl acl = iterator.next();
-            assert acl.principal().getPrincipalType().toLowerCase().equals("role") : "Only Acls with KafkaPrincipal of type \"role;\" is supported.";
-            assert acl.permissionType().name().equals(Allow.name()) : "Only Acls with Permission of type \"Allow\" is supported.";
-        }
+  }
+
+  /*
+  * For SSL session's Kafka creates user names with "CN=" prepended to the user name.
+  * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=".
+  * */
+  private String getName(RequestChannel.Session session) {
+    final String principalName = session.principal().getName();
+    int start = principalName.indexOf("CN=");
+    if (start >= 0) {
+      String tmpName, name = "";
+      tmpName = principalName.substring(start + 3);
+      int end = tmpName.indexOf(",");
+      if (end > 0) {
+          name = tmpName.substring(0, end);
+      } else {
+          name = tmpName;
+      }
+      return name;
+    } else {
+      return principalName;
     }
-
-    /*
-    * For SSL session's Kafka creates user names with "CN=" prepended to the user name.
-    * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=".
-    * */
-    private String getName(RequestChannel.Session session) {
-        final String principalName = session.principal().getName();
-        int start = principalName.indexOf("CN=");
-        if (start >= 0) {
-            String tmpName, name = "";
-            tmpName = principalName.substring(start + 3);
-            int end = tmpName.indexOf(",");
-            if (end > 0) {
-                name = tmpName.substring(0, end);
-            } else {
-                name = tmpName;
-            }
-            return name;
-        } else {
-            return principalName;
-        }
+  }
+
+  /**
+   * Initialize kerberos via UserGroupInformation.  Will only attempt to login
+   * during the first request, subsequent calls will have no effect.
+   */
+  private void initKerberos(String keytabFile, String principal) {
+    if (keytabFile == null || keytabFile.length() == 0) {
+      throw new IllegalArgumentException("keytabFile required because kerberos is enabled");
     }
-
-    /**
-     * Initialize kerberos via UserGroupInformation.  Will only attempt to login
-     * during the first request, subsequent calls will have no effect.
-     */
-    private void initKerberos(String keytabFile, String principal) {
-        if (keytabFile == null || keytabFile.length() == 0) {
-            throw new IllegalArgumentException("keytabFile required because kerberos is enabled");
-        }
-        if (principal == null || principal.length() == 0) {
-            throw new IllegalArgumentException("principal required because kerberos is enabled");
-        }
-        synchronized (KafkaAuthBinding.class) {
-            if (kerberosInit == null) {
-                kerberosInit = new Boolean(true);
-                // let's avoid modifying the supplied configuration, just to be conservative
-                final Configuration ugiConf = new Configuration();
-                ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS);
-                UserGroupInformation.setConfiguration(ugiConf);
-                LOG.info(
-                        "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ",
-                        keytabFile, principal);
-                try {
-                    UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
-                } catch (IOException ioe) {
-                    throw new RuntimeException("Failed to login user with Principal: " + principal +
-                            " and Keytab file: " + keytabFile, ioe);
-                }
-                LOG.info("Got Kerberos ticket");
-            }
+    if (principal == null || principal.length() == 0) {
+      throw new IllegalArgumentException("principal required because kerberos is enabled");
+    }
+    synchronized (KafkaAuthBinding.class) {
+      if (kerberosInit == null) {
+        kerberosInit = new Boolean(true);
+        // let's avoid modifying the supplied configuration, just to be conservative
+        final Configuration ugiConf = new Configuration();
+        ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS);
+        UserGroupInformation.setConfiguration(ugiConf);
+        LOG.info(
+                "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ",
+                keytabFile, principal);
+        try {
+          UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
+        } catch (IOException ioe) {
+          throw new RuntimeException("Failed to login user with Principal: " + principal +
+                    " and Keytab file: " + keytabFile, ioe);
         }
+        LOG.info("Got Kerberos ticket");
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
index 0a57e2e..2f4f8df 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
@@ -17,7 +17,7 @@ package org.apache.sentry.kafka.conf;
 import java.net.URL;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine;
+import org.apache.sentry.policy.engine.common.CommonPolicyEngine;
 import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
 import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
 
@@ -41,7 +41,7 @@ public class KafkaAuthConf extends Configuration {
     AUTHZ_PROVIDER("sentry.kafka.provider", HadoopGroupResourceAuthorizationProvider.class.getName()),
     AUTHZ_PROVIDER_RESOURCE("sentry.kafka.provider.resource", ""),
     AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()),
-    AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()),
+    AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", CommonPolicyEngine.class.getName()),
     AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"),
     AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka"),
     AUTHZ_PRINCIPAL_HOSTNAME(KAFKA_PRINCIPAL_HOSTNAME, null),

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
new file mode 100644
index 0000000..086b707
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public abstract class AbstractTestKafkaPolicyEngine {
+
+  private static final String ADMIN = "host=*->action=all";
+  private static final String ADMIN_HOST1 = "host=host1->action=all";
+  private static final String CONSUMER_T1_ALL = "host=*->topic=t1->action=read";
+  private static final String CONSUMER_T1_HOST1 = "host=host1->topic=t1->action=read";
+  private static final String CONSUMER_T2_HOST2 = "host=host2->topic=t2->action=read";
+  private static final String PRODUCER_T1_ALL = "host=*->topic=t1->action=write";
+  private static final String PRODUCER_T1_HOST1 = "host=host1->topic=t1->action=write";
+  private static final String PRODUCER_T2_HOST2 = "host=host2->topic=t2->action=write";
+  private static final String CONSUMER_PRODUCER_T1 = "host=host1->topic=t1->action=all";
+
+  private PolicyEngine policy;
+  private static File baseDir;
+
+  @BeforeClass
+  public static void setupClazz() throws IOException {
+    baseDir = Files.createTempDir();
+  }
+
+  @AfterClass
+  public static void teardownClazz() throws IOException {
+    if (baseDir != null) {
+      FileUtils.deleteQuietly(baseDir);
+    }
+  }
+
+  protected void setPolicy(PolicyEngine policy) {
+    this.policy = policy;
+  }
+
+  protected static File getBaseDir() {
+    return baseDir;
+  }
+
+  @Before
+  public void setup() throws IOException {
+    afterSetup();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    beforeTeardown();
+  }
+
+  protected void afterSetup() throws IOException {}
+
+  protected void beforeTeardown() throws IOException {}
+
+
+  @Test
+  public void testConsumer0() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_ALL));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("consumer_group0"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testConsumer1() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_HOST1));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("consumer_group1"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testConsumer2() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T2_HOST2));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("consumer_group2"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testProducer0() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_ALL));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("producer_group0"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testProducer1() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_HOST1));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("producer_group1"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+
+  @Test
+  public void testProducer2() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T2_HOST2));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("producer_group2"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testConsumerProducer0() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_PRODUCER_T1));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("consumer_producer_group0"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testSubAdmin() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN_HOST1));
+    Assert.assertEquals(expected.toString(),
+        new TreeSet<String>(policy.getPrivileges(set("subadmin_group"), ActiveRoleSet.ALL))
+            .toString());
+  }
+
+  @Test
+  public void testAdmin() throws Exception {
+    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN));
+    Assert
+        .assertEquals(expected.toString(),
+            new TreeSet<String>(policy.getPrivileges(set("admin_group"), ActiveRoleSet.ALL))
+                .toString());
+  }
+
+  private static Set<String> set(String... values) {
+    return Sets.newHashSet(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java
new file mode 100644
index 0000000..1ededbd
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.policy.kafka;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.policy.engine.common.CommonPolicyEngine;
+import org.apache.sentry.provider.common.ProviderBackend;
+import org.apache.sentry.provider.common.ProviderBackendContext;
+import org.apache.sentry.provider.file.SimpleFileProviderBackend;
+
+import java.io.IOException;
+
+public class KafkaPolicyTestUtil {
+
+  public static PolicyEngine createPolicyEngineForTest(String resource) throws IOException {
+
+    ProviderBackend providerBackend = new SimpleFileProviderBackend(new Configuration(), resource);
+
+    // create backendContext
+    ProviderBackendContext context = new ProviderBackendContext();
+    context.setAllowPerDatabase(false);
+    context.setValidators(KafkaPrivilegeModel.getInstance().getPrivilegeValidators());
+    // initialize the backend with the context
+    providerBackend.initialize(context);
+
+
+    return new CommonPolicyEngine(providerBackend);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
new file mode 100644
index 0000000..572c74d
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.util.Set;
+
+import org.apache.sentry.provider.common.GroupMappingService;
+
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+public class MockGroupMappingServiceProvider implements GroupMappingService {
+  private final Multimap<String, String> userToGroupMap;
+
+  public MockGroupMappingServiceProvider(Multimap<String, String> userToGroupMap) {
+    this.userToGroupMap = userToGroupMap;
+  }
+  @Override
+  public Set<String> getGroups(String user) {
+    return Sets.newHashSet(userToGroupMap.get(user));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
new file mode 100644
index 0000000..07f4d7d
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.Action;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.Subject;
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.ConsumerGroup;
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
+import org.apache.sentry.provider.common.ResourceAuthorizationProvider;
+import org.apache.sentry.provider.file.PolicyFiles;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public class TestKafkaAuthorizationProviderGeneralCases {
+  private static final Multimap<String, String> USER_TO_GROUP_MAP = HashMultimap.create();
+
+  private static final Host HOST_1 = new Host("host1");
+  private static final Host HOST_2 = new Host("host2");
+  private static final Cluster cluster1 = new Cluster();
+  private static final Topic topic1 = new Topic("t1");
+  private static final Topic topic2 = new Topic("t2");
+  private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1");
+  private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2");
+
+  private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL);
+  private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ);
+  private static final KafkaAction WRITE = new KafkaAction(KafkaActionConstant.WRITE);
+  private static final KafkaAction CREATE = new KafkaAction(KafkaActionConstant.CREATE);
+  private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE);
+  private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER);
+  private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE);
+  private static final KafkaAction CLUSTER_ACTION = new KafkaAction(
+      KafkaActionConstant.CLUSTER_ACTION);
+
+  private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION);
+
+  private static final Subject ADMIN = new Subject("admin1");
+  private static final Subject SUB_ADMIN = new Subject("subadmin1");
+  private static final Subject CONSUMER0 = new Subject("consumer0");
+  private static final Subject CONSUMER1 = new Subject("consumer1");
+  private static final Subject CONSUMER2 = new Subject("consumer2");
+  private static final Subject PRODUCER0 = new Subject("producer0");
+  private static final Subject PRODUCER1 = new Subject("producer1");
+  private static final Subject PRODUCER2 = new Subject("producer2");
+  private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0");
+
+  private static final String ADMIN_GROUP  = "admin_group";
+  private static final String SUBADMIN_GROUP  = "subadmin_group";
+  private static final String CONSUMER_GROUP0 = "consumer_group0";
+  private static final String CONSUMER_GROUP1 = "consumer_group1";
+  private static final String CONSUMER_GROUP2 = "consumer_group2";
+  private static final String PRODUCER_GROUP0 = "producer_group0";
+  private static final String PRODUCER_GROUP1 = "producer_group1";
+  private static final String PRODUCER_GROUP2 = "producer_group2";
+  private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0";
+
+  static {
+    USER_TO_GROUP_MAP.putAll(ADMIN.getName(), Arrays.asList(ADMIN_GROUP));
+    USER_TO_GROUP_MAP.putAll(SUB_ADMIN.getName(),  Arrays.asList(SUBADMIN_GROUP ));
+    USER_TO_GROUP_MAP.putAll(CONSUMER0.getName(),  Arrays.asList(CONSUMER_GROUP0));
+    USER_TO_GROUP_MAP.putAll(CONSUMER1.getName(),  Arrays.asList(CONSUMER_GROUP1));
+    USER_TO_GROUP_MAP.putAll(CONSUMER2.getName(),  Arrays.asList(CONSUMER_GROUP2));
+    USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(),  Arrays.asList(PRODUCER_GROUP0));
+    USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(),  Arrays.asList(PRODUCER_GROUP1));
+    USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(),  Arrays.asList(PRODUCER_GROUP2));
+    USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(),  Arrays.asList(CONSUMER_PRODUCER_GROUP0));
+  }
+
+  private final ResourceAuthorizationProvider authzProvider;
+  private File baseDir;
+
+  public TestKafkaAuthorizationProviderGeneralCases() throws IOException {
+    baseDir = Files.createTempDir();
+    PolicyFiles.copyToDir(baseDir, "kafka-policy-test-authz-provider.ini");
+    authzProvider = new HadoopGroupResourceAuthorizationProvider(
+        KafkaPolicyTestUtil.createPolicyEngineForTest(new File(baseDir,
+            "kafka-policy-test-authz-provider.ini").getPath()),
+        new MockGroupMappingServiceProvider(USER_TO_GROUP_MAP), KafkaPrivilegeModel.getInstance());
+  }
+
+  @After
+  public void teardown() {
+    if(baseDir != null) {
+      FileUtils.deleteQuietly(baseDir);
+    }
+  }
+
+  private void doTestResourceAuthorizationProvider(Subject subject, List<? extends Authorizable> authorizableHierarchy,
+      Set<? extends Action> actions, boolean expected) throws Exception {
+    Objects.ToStringHelper helper = Objects.toStringHelper("TestParameters");
+    helper.add("Subject", subject).add("authzHierarchy", authorizableHierarchy).add("action", actions);
+    Assert.assertEquals(helper.toString(), expected,
+        authzProvider.hasAccess(subject, authorizableHierarchy, actions, ActiveRoleSet.ALL));
+  }
+
+  @Test
+  public void testAdmin() throws Exception {
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1), allActions, true);
+
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cluster1), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic1), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic2), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, false);
+    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2), allActions, false);
+
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1), allActions, true);
+
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cluster1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic2), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, true);
+    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2), allActions, true);
+  }
+
+  @Test
+  public void testConsumer() throws Exception {
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(CONSUMER0, Arrays.asList(host, topic1),
+            Sets.newHashSet(action), READ.equals(action));
+      }
+    }
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1),
+            Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action));
+      }
+    }
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2),
+            Sets.newHashSet(action), HOST_2.equals(host) && READ.equals(action));
+      }
+    }
+  }
+
+  @Test
+  public void testProducer() throws Exception {
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(PRODUCER0, Arrays.asList(host, topic1),
+            Sets.newHashSet(action), WRITE.equals(action));
+      }
+    }
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1),
+            Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
+      }
+    }
+    for (KafkaAction action : allActions) {
+      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+        doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2),
+            Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
+      }
+    }
+  }
+
+  @Test
+  public void testConsumerProducer() throws Exception {
+    for (KafkaAction action : allActions) {
+      doTestResourceAuthorizationProvider(CONSUMER_PRODUCER0, Arrays.asList(HOST_1, topic1),
+          Sets.newHashSet(action), true);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java
new file mode 100644
index 0000000..63d2f30
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.Action;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.Subject;
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.provider.common.AuthorizationProvider;
+import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider;
+import org.apache.sentry.provider.file.PolicyFile;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public class TestKafkaAuthorizationProviderSpecialCases {
+  private AuthorizationProvider authzProvider;
+  private PolicyFile policyFile;
+  private File baseDir;
+  private File iniFile;
+  private String initResource;
+  @Before
+  public void setup() throws IOException {
+    baseDir = Files.createTempDir();
+    iniFile = new File(baseDir, "policy.ini");
+    initResource = "file://" + iniFile.getPath();
+    policyFile = new PolicyFile();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if(baseDir != null) {
+      FileUtils.deleteQuietly(baseDir);
+    }
+  }
+
+  @Test
+  public void testDuplicateEntries() throws Exception {
+    Subject user1 = new Subject("user1");
+    Host host1 = new Host("host1");
+    Topic topic1 = new Topic("t1");
+    Set<? extends Action> actions = Sets.newHashSet(new KafkaAction(KafkaActionConstant.READ));
+    policyFile.addGroupsToUser(user1.getName(), true, "group1", "group1")
+      .addRolesToGroup("group1",  true, "role1", "role1")
+      .addPermissionsToRole("role1", true, "host=host1->topic=t1->action=read",
+          "host=host1->topic=t1->action=read");
+    policyFile.write(iniFile);
+    PolicyEngine policy = KafkaPolicyTestUtil.createPolicyEngineForTest(initResource);
+    authzProvider = new LocalGroupResourceAuthorizationProvider(initResource,
+        policy, KafkaPrivilegeModel.getInstance());
+    List<? extends Authorizable> authorizableHierarchy = ImmutableList.of(host1, topic1);
+    Assert.assertTrue(authorizableHierarchy.toString(),
+        authzProvider.hasAccess(user1, authorizableHierarchy, actions, ActiveRoleSet.ALL));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
new file mode 100644
index 0000000..62fbea7
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.fail;
+
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.ConsumerGroup;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.KafkaModelAuthorizables;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.shiro.config.ConfigurationException;
+import org.junit.Test;
+
+public class TestKafkaModelAuthorizables {
+
+  @Test
+  public void testHost() throws Exception {
+    Host host1 = (Host) KafkaModelAuthorizables.from("HOST=host1");
+    assertEquals("host1", host1.getName());
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testNoKV() throws Exception {
+    System.out.println(KafkaModelAuthorizables.from("nonsense"));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testEmptyKey() throws Exception {
+    System.out.println(KafkaModelAuthorizables.from("=host1"));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testEmptyValue() throws Exception {
+    System.out.println(KafkaModelAuthorizables.from("HOST="));
+  }
+
+  @Test
+  public void testNotAuthorizable() throws Exception {
+    assertNull(KafkaModelAuthorizables.from("k=v"));
+  }
+
+  @Test
+  public void testResourceNameIsCaseSensitive() throws Exception {
+    Host host1 = (Host)KafkaModelAuthorizables.from("HOST=Host1");
+    assertEquals("Host1", host1.getName());
+
+    Cluster cluster1 = (Cluster)KafkaModelAuthorizables.from("Cluster=kafka-cluster");
+    assertEquals("kafka-cluster", cluster1.getName());
+
+    Topic topic1 = (Topic)KafkaModelAuthorizables.from("topic=topiC1");
+    assertEquals("topiC1", topic1.getName());
+
+    ConsumerGroup consumergroup1 = (ConsumerGroup)KafkaModelAuthorizables.from("ConsumerGroup=CG1");
+    assertEquals("CG1", consumergroup1.getName());
+  }
+
+  @Test
+  public void testClusterResourceNameIsRestricted() throws Exception {
+    try {
+      KafkaModelAuthorizables.from("Cluster=cluster1");
+      fail("Cluster with name other than " + Cluster.NAME + " must not have been created.");
+    } catch (ConfigurationException cex) {
+      assertEquals("Exception message is not as expected.", "Kafka's cluster resource can only have name " + Cluster.NAME, cex.getMessage());
+    } catch (Exception ex) {
+      fail("Configuration exception was expected for invalid Cluster name.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java
new file mode 100644
index 0000000..4299b1f
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.sentry.provider.file.PolicyFiles;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestKafkaPolicyEngineDFS extends AbstractTestKafkaPolicyEngine {
+  private static MiniDFSCluster dfsCluster;
+  private static FileSystem fileSystem;
+  private static Path root;
+  private static Path etc;
+
+  @BeforeClass
+  public static void setupLocalClazz() throws IOException {
+    File baseDir = getBaseDir();
+    Assert.assertNotNull(baseDir);
+    File dfsDir = new File(baseDir, "dfs");
+    Assert.assertTrue(dfsDir.isDirectory() || dfsDir.mkdirs());
+    Configuration conf = new Configuration();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath());
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    fileSystem = dfsCluster.getFileSystem();
+    root = new Path(fileSystem.getUri().toString());
+    etc = new Path(root, "/etc");
+    fileSystem.mkdirs(etc);
+  }
+
+  @AfterClass
+  public static void teardownLocalClazz() {
+    if(dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
+
+  @Override
+  protected void afterSetup() throws IOException {
+    fileSystem.delete(etc, true);
+    fileSystem.mkdirs(etc);
+    PolicyFiles.copyToDir(fileSystem, etc, "kafka-policy-test-authz-provider.ini");
+    setPolicy(KafkaPolicyTestUtil.createPolicyEngineForTest(new Path(etc,
+        "kafka-policy-test-authz-provider.ini").toString()));
+  }
+
+  @Override
+  protected void beforeTeardown() throws IOException {
+    fileSystem.delete(etc, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java
new file mode 100644
index 0000000..9a69f1c
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.provider.file.PolicyFiles;
+
+public class TestKafkaPolicyEngineLocalFS extends AbstractTestKafkaPolicyEngine {
+
+  @Override
+  protected void  afterSetup() throws IOException {
+    File baseDir = getBaseDir();
+    Assert.assertNotNull(baseDir);
+    Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
+    PolicyFiles.copyToDir(baseDir, "kafka-policy-test-authz-provider.ini");
+    setPolicy(KafkaPolicyTestUtil.createPolicyEngineForTest(new File(baseDir,
+        "kafka-policy-test-authz-provider.ini").getPath()));
+  }
+
+  @Override
+  protected void beforeTeardown() throws IOException {
+    File baseDir = getBaseDir();
+    Assert.assertNotNull(baseDir);
+    FileUtils.deleteQuietly(baseDir);
+  }
+}