You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by co...@apache.org on 2016/04/22 08:28:30 UTC
[10/13] sentry git commit: SENTRY-999: Refactor the sentry to
integrate with external components quickly (Colin Ma, reviewed by Dapeng Sun)
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index c6600a0..15f7359 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -42,16 +42,19 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.sentry.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.Model;
import org.apache.sentry.core.common.Subject;
import org.apache.sentry.core.model.kafka.KafkaActionFactory;
import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
import org.apache.sentry.kafka.ConvertUtil;
import org.apache.sentry.kafka.conf.KafkaAuthConf.AuthzConfVars;
import org.apache.sentry.policy.common.PolicyEngine;
import org.apache.sentry.provider.common.AuthorizationComponent;
import org.apache.sentry.provider.common.AuthorizationProvider;
import org.apache.sentry.provider.common.ProviderBackend;
+import org.apache.sentry.provider.common.ProviderBackendContext;
import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
@@ -72,491 +75,497 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
public class KafkaAuthBinding {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class);
- private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA;
- private static final String COMPONENT_NAME = COMPONENT_TYPE;
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class);
+ private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA;
+ private static final String COMPONENT_NAME = COMPONENT_TYPE;
- private static Boolean kerberosInit;
+ private static Boolean kerberosInit;
- private final Configuration authConf;
- private final AuthorizationProvider authProvider;
- private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
+ private final Configuration authConf;
+ private final AuthorizationProvider authProvider;
+ private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
- private ProviderBackend providerBackend;
- private String instanceName;
- private String requestorName;
- private java.util.Map<String, ?> kafkaConfigs;
+ private ProviderBackend providerBackend;
+ private String instanceName;
+ private String requestorName;
+ private java.util.Map<String, ?> kafkaConfigs;
- public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception {
- this.instanceName = instanceName;
- this.requestorName = requestorName;
- this.authConf = authConf;
- this.kafkaConfigs = kafkaConfigs;
- this.authProvider = createAuthProvider();
- }
+ public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception {
+ this.instanceName = instanceName;
+ this.requestorName = requestorName;
+ this.authConf = authConf;
+ this.kafkaConfigs = kafkaConfigs;
+ this.authProvider = createAuthProvider();
+ }
+ /**
+ * Instantiate the configured authz provider
+ *
+ * @return {@link AuthorizationProvider}
+ */
+ private AuthorizationProvider createAuthProvider() throws Exception {
/**
- * Instantiate the configured authz provider
- *
- * @return {@link AuthorizationProvider}
+ * get the authProvider class, policyEngine class, providerBackend class and resources from the
+ * kafkaAuthConf config
*/
- private AuthorizationProvider createAuthProvider() throws Exception {
- /**
- * get the authProvider class, policyEngine class, providerBackend class and resources from the
- * kafkaAuthConf config
- */
- String authProviderName =
- authConf.get(AuthzConfVars.AUTHZ_PROVIDER.getVar(),
- AuthzConfVars.AUTHZ_PROVIDER.getDefault());
- String resourceName =
- authConf.get(AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(),
- AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getDefault());
- String providerBackendName =
- authConf.get(AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(),
- AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getDefault());
- String policyEngineName =
- authConf.get(AuthzConfVars.AUTHZ_POLICY_ENGINE.getVar(),
- AuthzConfVars.AUTHZ_POLICY_ENGINE.getDefault());
- if (resourceName != null && resourceName.startsWith("classpath:")) {
- String resourceFileName = resourceName.substring("classpath:".length());
- resourceName = AuthorizationProvider.class.getClassLoader().getResource(resourceFileName).getPath();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using authorization provider " + authProviderName + " with resource "
- + resourceName + ", policy engine " + policyEngineName + ", provider backend "
- + providerBackendName);
- }
-
- // Initiate kerberos via UserGroupInformation if required
- if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE))
- && kafkaConfigs != null) {
- String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString();
- String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString();
- if (keytabProp != null && principalProp != null) {
- String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString();
- if (actualHost != null) {
- principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost);
- }
- initKerberos(keytabProp, principalProp);
- } else {
- LOG.debug("Could not initialize Kerberos.\n" +
- AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" +
- AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString());
- }
- } else {
- LOG.debug("Could not initialize Kerberos as no kafka config provided. " +
- AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() +
- " are required configs to be able to initialize Kerberos");
- }
-
- // Instantiate the configured providerBackend
- Constructor<?> providerBackendConstructor =
- Class.forName(providerBackendName)
- .getDeclaredConstructor(Configuration.class, String.class);
- providerBackendConstructor.setAccessible(true);
- providerBackend =
- (ProviderBackend) providerBackendConstructor.newInstance(new Object[]{authConf,
- resourceName});
- if (providerBackend instanceof SentryGenericProviderBackend) {
- ((SentryGenericProviderBackend) providerBackend).setComponentType(COMPONENT_TYPE);
- ((SentryGenericProviderBackend) providerBackend).setServiceName(instanceName);
- }
-
- // Instantiate the configured policyEngine
- Constructor<?> policyConstructor =
- Class.forName(policyEngineName).getDeclaredConstructor(ProviderBackend.class);
- policyConstructor.setAccessible(true);
- PolicyEngine policyEngine =
- (PolicyEngine) policyConstructor.newInstance(new Object[]{providerBackend});
-
- // Instantiate the configured authProvider
- Constructor<?> constructor =
- Class.forName(authProviderName).getDeclaredConstructor(Configuration.class, String.class,
- PolicyEngine.class);
- constructor.setAccessible(true);
- return (AuthorizationProvider) constructor.newInstance(new Object[]{authConf, resourceName,
- policyEngine});
+ String authProviderName =
+ authConf.get(AuthzConfVars.AUTHZ_PROVIDER.getVar(),
+ AuthzConfVars.AUTHZ_PROVIDER.getDefault());
+ String resourceName =
+ authConf.get(AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(),
+ AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getDefault());
+ String providerBackendName =
+ authConf.get(AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(),
+ AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getDefault());
+ String policyEngineName =
+ authConf.get(AuthzConfVars.AUTHZ_POLICY_ENGINE.getVar(),
+ AuthzConfVars.AUTHZ_POLICY_ENGINE.getDefault());
+ if (resourceName != null && resourceName.startsWith("classpath:")) {
+ String resourceFileName = resourceName.substring("classpath:".length());
+ resourceName = AuthorizationProvider.class.getClassLoader().getResource(resourceFileName).getPath();
}
-
- /**
- * Authorize access to a Kafka privilege
- */
- public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) {
- List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(session.clientAddress().getHostAddress(), resource);
- Set<KafkaAction> actions = Sets.newHashSet(actionFactory.getActionByName(operation.name()));
- return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using authorization provider " + authProviderName + " with resource "
+ + resourceName + ", policy engine " + policyEngineName + ", provider backend "
+ + providerBackendName);
}
- public void addAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
- verifyAcls(acls);
- LOG.info("Adding Acl: acl->" + acls + " resource->" + resource);
-
- final Iterator<Acl> iterator = acls.iterator();
- while (iterator.hasNext()) {
- final Acl acl = iterator.next();
- final String role = getRole(acl);
- if (!roleExists(role)) {
- throw new KafkaException("Can not add Acl for non-existent Role: " + role);
- }
- execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- client.grantPrivilege(
- requestorName, role, COMPONENT_NAME, toTSentryPrivilege(acl, resource));
- return null;
- }
- });
- }
+ // Initiate kerberos via UserGroupInformation if required
+ if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE))
+ && kafkaConfigs != null) {
+ String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString();
+ String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString();
+ if (keytabProp != null && principalProp != null) {
+ String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString();
+ if (actualHost != null) {
+ principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost);
+ }
+ initKerberos(keytabProp, principalProp);
+ } else {
+ LOG.debug("Could not initialize Kerberos.\n" +
+ AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" +
+ AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString());
+ }
+ } else {
+ LOG.debug("Could not initialize Kerberos as no kafka config provided. " +
+ AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() +
+ " are required configs to be able to initialize Kerberos");
}
- public boolean removeAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
- verifyAcls(acls);
- LOG.info("Removing Acl: acl->" + acls + " resource->" + resource);
- final Iterator<Acl> iterator = acls.iterator();
- while (iterator.hasNext()) {
- final Acl acl = iterator.next();
- final String role = getRole(acl);
- try {
- execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- client.dropPrivilege(
- requestorName, role, toTSentryPrivilege(acl, resource));
- return null;
- }
- });
- } catch (KafkaException kex) {
- LOG.error("Failed to remove acls.", kex);
- return false;
- }
- }
-
- return true;
+ // Instantiate the configured providerBackend
+ Constructor<?> providerBackendConstructor =
+ Class.forName(providerBackendName)
+ .getDeclaredConstructor(Configuration.class, String.class);
+ providerBackendConstructor.setAccessible(true);
+ providerBackend =
+ (ProviderBackend) providerBackendConstructor.newInstance(new Object[]{authConf,
+ resourceName});
+ if (providerBackend instanceof SentryGenericProviderBackend) {
+ ((SentryGenericProviderBackend) providerBackend).setComponentType(COMPONENT_TYPE);
+ ((SentryGenericProviderBackend) providerBackend).setServiceName(instanceName);
}
- public void addRole(final String role) {
- if (roleExists(role)) {
- throw new KafkaException("Can not create an existing role, " + role + ", again.");
+ // Create backend context
+ ProviderBackendContext context = new ProviderBackendContext();
+ context.setAllowPerDatabase(false);
+ context.setValidators(KafkaPrivilegeModel.getInstance().getPrivilegeValidators());
+ providerBackend.initialize(context);
+
+ // Instantiate the configured policyEngine
+ Constructor<?> policyConstructor =
+ Class.forName(policyEngineName).getDeclaredConstructor(ProviderBackend.class);
+ policyConstructor.setAccessible(true);
+ PolicyEngine policyEngine =
+ (PolicyEngine) policyConstructor.newInstance(new Object[]{providerBackend});
+
+ // Instantiate the configured authProvider
+ Constructor<?> constructor =
+ Class.forName(authProviderName).getDeclaredConstructor(Configuration.class, String.class,
+ PolicyEngine.class, Model.class);
+ constructor.setAccessible(true);
+ return (AuthorizationProvider) constructor.newInstance(new Object[]{authConf, resourceName,
+ policyEngine, KafkaPrivilegeModel.getInstance()});
+ }
+
+ /**
+ * Authorize access to a Kafka privilege
+ */
+ public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) {
+ List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(session.clientAddress().getHostAddress(), resource);
+ Set<KafkaAction> actions = Sets.newHashSet(actionFactory.getActionByName(operation.name()));
+ return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL);
+ }
+
+ public void addAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
+ verifyAcls(acls);
+ LOG.info("Adding Acl: acl->" + acls + " resource->" + resource);
+
+ final Iterator<Acl> iterator = acls.iterator();
+ while (iterator.hasNext()) {
+ final Acl acl = iterator.next();
+ final String role = getRole(acl);
+ if (!roleExists(role)) {
+ throw new KafkaException("Can not add Acl for non-existent Role: " + role);
+ }
+ execute(new Command<Void>() {
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ client.grantPrivilege(
+ requestorName, role, COMPONENT_NAME, toTSentryPrivilege(acl, resource));
+ return null;
}
-
- execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- client.createRole(
- requestorName, role, COMPONENT_NAME);
- return null;
- }
- });
- }
-
- public void addRoleToGroups(final String role, final java.util.Set<String> groups) {
- execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- client.addRoleToGroups(
- requestorName, role, COMPONENT_NAME, groups);
- return null;
- }
- });
+ });
}
-
- public void dropAllRoles() {
- final List<String> roles = getAllRoles();
+ }
+
+ public boolean removeAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
+ verifyAcls(acls);
+ LOG.info("Removing Acl: acl->" + acls + " resource->" + resource);
+ final Iterator<Acl> iterator = acls.iterator();
+ while (iterator.hasNext()) {
+ final Acl acl = iterator.next();
+ final String role = getRole(acl);
+ try {
execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- for (String role : roles) {
- client.dropRole(requestorName, role, COMPONENT_NAME);
- }
- return null;
- }
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ client.dropPrivilege(
+ requestorName, role, toTSentryPrivilege(acl, resource));
+ return null;
+ }
});
+ } catch (KafkaException kex) {
+ LOG.error("Failed to remove acls.", kex);
+ return false;
+ }
}
- private List<String> getRolesforGroup(final String groupName) {
- final List<String> roles = new ArrayList<>();
- execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- for (TSentryRole tSentryRole : client.listRolesByGroupName(requestorName, groupName, COMPONENT_NAME)) {
- roles.add(tSentryRole.getRoleName());
- }
- return null;
- }
- });
-
- return roles;
- }
+ return true;
+ }
- private SentryGenericServiceClient getClient() throws Exception {
- return SentryGenericServiceClientFactory.create(this.authConf);
+ public void addRole(final String role) {
+ if (roleExists(role)) {
+ throw new KafkaException("Can not create an existing role, " + role + ", again.");
}
- public boolean removeAcls(final Resource resource) {
- LOG.info("Removing Acls for Resource: resource->" + resource);
- List<String> roles = getAllRoles();
- final List<TSentryPrivilege> tSentryPrivileges = getAllPrivileges(roles);
- try {
- execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- for (TSentryPrivilege tSentryPrivilege : tSentryPrivileges) {
- if (isPrivilegeForResource(tSentryPrivilege, resource)) {
- client.dropPrivilege(
- requestorName, COMPONENT_NAME, tSentryPrivilege);
- }
- }
- return null;
- }
- });
- } catch (KafkaException kex) {
- LOG.error("Failed to remove acls.", kex);
- return false;
+ execute(new Command<Void>() {
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ client.createRole(
+ requestorName, role, COMPONENT_NAME);
+ return null;
+ }
+ });
+ }
+
+ public void addRoleToGroups(final String role, final java.util.Set<String> groups) {
+ execute(new Command<Void>() {
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ client.addRoleToGroups(
+ requestorName, role, COMPONENT_NAME, groups);
+ return null;
+ }
+ });
+ }
+
+ public void dropAllRoles() {
+ final List<String> roles = getAllRoles();
+ execute(new Command<Void>() {
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ for (String role : roles) {
+ client.dropRole(requestorName, role, COMPONENT_NAME);
}
-
- return true;
- }
-
- public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) {
- final Option<scala.collection.immutable.Set<Acl>> acls = getAcls().get(resource);
- if (acls.nonEmpty())
- return acls.get();
- return new scala.collection.immutable.HashSet<Acl>();
- }
-
- public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(KafkaPrincipal principal) {
- if (principal.getPrincipalType().toLowerCase().equals("group")) {
- List<String> roles = getRolesforGroup(principal.getName());
- return getAclsForRoles(roles);
- } else {
- LOG.info("Did not recognize Principal type: " + principal.getPrincipalType() + ". Returning Acls for all principals.");
- return getAcls();
+ return null;
+ }
+ });
+ }
+
+ private List<String> getRolesforGroup(final String groupName) {
+ final List<String> roles = new ArrayList<>();
+ execute(new Command<Void>() {
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ for (TSentryRole tSentryRole : client.listRolesByGroupName(requestorName, groupName, COMPONENT_NAME)) {
+ roles.add(tSentryRole.getRoleName());
}
- }
-
- public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() {
- final List<String> roles = getAllRoles();
- return getAclsForRoles(roles);
- }
-
- /**
- * A Command is a closure used to pass a block of code from individual
- * functions to execute, which centralizes connection error
- * handling. Command is parameterized on the return type of the function.
- */
- private interface Command<T> {
- T run(SentryGenericServiceClient client) throws Exception;
- }
-
- private <T> T execute(Command<T> cmd) throws KafkaException {
- SentryGenericServiceClient client = null;
- try {
- client = getClient();
- return cmd.run(client);
- } catch (SentryUserException ex) {
- String msg = "Unable to excute command on sentry server: " + ex.getMessage();
- LOG.error(msg, ex);
- throw new KafkaException(msg, ex);
- } catch (Exception ex) {
- String msg = "Unable to obtain client:" + ex.getMessage();
- LOG.error(msg, ex);
- throw new KafkaException(msg, ex);
- } finally {
- if (client != null) {
- client.close();
+ return null;
+ }
+ });
+
+ return roles;
+ }
+
+ private SentryGenericServiceClient getClient() throws Exception {
+ return SentryGenericServiceClientFactory.create(this.authConf);
+ }
+
+ public boolean removeAcls(final Resource resource) {
+ LOG.info("Removing Acls for Resource: resource->" + resource);
+ List<String> roles = getAllRoles();
+ final List<TSentryPrivilege> tSentryPrivileges = getAllPrivileges(roles);
+ try {
+ execute(new Command<Void>() {
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ for (TSentryPrivilege tSentryPrivilege : tSentryPrivileges) {
+ if (isPrivilegeForResource(tSentryPrivilege, resource)) {
+ client.dropPrivilege(
+ requestorName, COMPONENT_NAME, tSentryPrivilege);
}
+ }
+ return null;
}
+ });
+ } catch (KafkaException kex) {
+ LOG.error("Failed to remove acls.", kex);
+ return false;
}
- private TSentryPrivilege toTSentryPrivilege(Acl acl, Resource resource) {
- final List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(acl.host(), resource);
- final List<TAuthorizable> tAuthorizables = new ArrayList<>();
- for (Authorizable authorizable : authorizables) {
- tAuthorizables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName()));
- }
- TSentryPrivilege tSentryPrivilege = new TSentryPrivilege(COMPONENT_NAME, instanceName, tAuthorizables, acl.operation().name());
- return tSentryPrivilege;
- }
-
- private String getRole(Acl acl) {
- return acl.principal().getName();
- }
-
- private boolean isPrivilegeForResource(TSentryPrivilege tSentryPrivilege, Resource resource) {
- final java.util.Iterator<TAuthorizable> authorizablesIterator = tSentryPrivilege.getAuthorizablesIterator();
- while (authorizablesIterator.hasNext()) {
- TAuthorizable tAuthorizable = authorizablesIterator.next();
- if (tAuthorizable.getType().equals(resource.resourceType().name())) {
- return true;
- }
- }
- return false;
+ return true;
+ }
+
+ public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) {
+ final Option<scala.collection.immutable.Set<Acl>> acls = getAcls().get(resource);
+ if (acls.nonEmpty())
+ return acls.get();
+ return new scala.collection.immutable.HashSet<Acl>();
+ }
+
+ public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(KafkaPrincipal principal) {
+ if (principal.getPrincipalType().toLowerCase().equals("group")) {
+ List<String> roles = getRolesforGroup(principal.getName());
+ return getAclsForRoles(roles);
+ } else {
+ LOG.info("Did not recognize Principal type: " + principal.getPrincipalType() + ". Returning Acls for all principals.");
+ return getAcls();
}
-
- private List<TSentryPrivilege> getAllPrivileges(final List<String> roles) {
- final List<TSentryPrivilege> tSentryPrivileges = new ArrayList<>();
- execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- for (String role : roles) {
- tSentryPrivileges.addAll(client.listPrivilegesByRoleName(
- requestorName, role, COMPONENT_NAME, instanceName));
- }
- return null;
- }
- });
-
- return tSentryPrivileges;
+ }
+
+ public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() {
+ final List<String> roles = getAllRoles();
+ return getAclsForRoles(roles);
+ }
+
+ /**
+ * A Command is a closure used to pass a block of code from individual
+ * functions to execute, which centralizes connection error
+ * handling. Command is parameterized on the return type of the function.
+ */
+ private interface Command<T> {
+ T run(SentryGenericServiceClient client) throws Exception;
+ }
+
+ private <T> T execute(Command<T> cmd) throws KafkaException {
+ SentryGenericServiceClient client = null;
+ try {
+ client = getClient();
+ return cmd.run(client);
+ } catch (SentryUserException ex) {
+ String msg = "Unable to excute command on sentry server: " + ex.getMessage();
+ LOG.error(msg, ex);
+ throw new KafkaException(msg, ex);
+ } catch (Exception ex) {
+ String msg = "Unable to obtain client:" + ex.getMessage();
+ LOG.error(msg, ex);
+ throw new KafkaException(msg, ex);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
}
+ }
- private List<String> getAllRoles() {
- final List<String> roles = new ArrayList<>();
- execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- for (TSentryRole tSentryRole : client.listAllRoles(requestorName, COMPONENT_NAME)) {
- roles.add(tSentryRole.getRoleName());
- }
- return null;
- }
- });
-
- return roles;
+ private TSentryPrivilege toTSentryPrivilege(Acl acl, Resource resource) {
+ final List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(acl.host(), resource);
+ final List<TAuthorizable> tAuthorizables = new ArrayList<>();
+ for (Authorizable authorizable : authorizables) {
+ tAuthorizables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName()));
}
-
- private Map<Resource, scala.collection.immutable.Set<Acl>> getAclsForRoles(final List<String> roles) {
- return scala.collection.JavaConverters.mapAsScalaMapConverter(
- rolePrivilegesToResourceAcls(getRoleToPrivileges(roles)))
- .asScala().toMap(Predef.<Tuple2<Resource, scala.collection.immutable.Set<Acl>>>conforms());
+ TSentryPrivilege tSentryPrivilege = new TSentryPrivilege(COMPONENT_NAME, instanceName, tAuthorizables, acl.operation().name());
+ return tSentryPrivilege;
+ }
+
+ private String getRole(Acl acl) {
+ return acl.principal().getName();
+ }
+
+ private boolean isPrivilegeForResource(TSentryPrivilege tSentryPrivilege, Resource resource) {
+ final java.util.Iterator<TAuthorizable> authorizablesIterator = tSentryPrivilege.getAuthorizablesIterator();
+ while (authorizablesIterator.hasNext()) {
+ TAuthorizable tAuthorizable = authorizablesIterator.next();
+ if (tAuthorizable.getType().equals(resource.resourceType().name())) {
+ return true;
+ }
}
-
- private java.util.Map<Resource, scala.collection.immutable.Set<Acl>> rolePrivilegesToResourceAcls(java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap) {
- final java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap = new HashMap<>();
- for (String role : rolePrivilegesMap.keySet()) {
- scala.collection.immutable.Set<TSentryPrivilege> privileges = rolePrivilegesMap.get(role);
- final Iterator<TSentryPrivilege> iterator = privileges.iterator();
- while (iterator.hasNext()) {
- TSentryPrivilege privilege = iterator.next();
- final List<TAuthorizable> authorizables = privilege.getAuthorizables();
- String host = null;
- String operation = privilege.getAction();
- for (TAuthorizable tAuthorizable : authorizables) {
- if (tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name())) {
- host = tAuthorizable.getName();
- } else {
- Resource resource = new Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()), tAuthorizable.getName());
- if (operation.equals("*")) {
- operation = "All";
- }
- Acl acl = new Acl(new KafkaPrincipal("role", role), Allow$.MODULE$, host, Operation$.MODULE$.fromString(operation));
- Set<Acl> newAclsJava = new HashSet<Acl>();
- newAclsJava.add(acl);
- addExistingAclsForResource(resourceAclsMap, resource, newAclsJava);
- final scala.collection.mutable.Set<Acl> aclScala = JavaConversions.asScalaSet(newAclsJava);
- resourceAclsMap.put(resource, aclScala.<Acl>toSet());
- }
- }
- }
+ return false;
+ }
+
+ private List<TSentryPrivilege> getAllPrivileges(final List<String> roles) {
+ final List<TSentryPrivilege> tSentryPrivileges = new ArrayList<>();
+ execute(new Command<Void>() {
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ for (String role : roles) {
+ tSentryPrivileges.addAll(client.listPrivilegesByRoleName(
+ requestorName, role, COMPONENT_NAME, instanceName));
}
-
- return resourceAclsMap;
- }
-
- private java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> getRoleToPrivileges(final List<String> roles) {
- final java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap = new HashMap<>();
- execute(new Command<Void>() {
- @Override
- public Void run(SentryGenericServiceClient client) throws Exception {
- for (String role : roles) {
- final Set<TSentryPrivilege> rolePrivileges = client.listPrivilegesByRoleName(
- requestorName, role, COMPONENT_NAME, instanceName);
- final scala.collection.immutable.Set<TSentryPrivilege> rolePrivilegesScala =
- scala.collection.JavaConverters.asScalaSetConverter(rolePrivileges).asScala().toSet();
- rolePrivilegesMap.put(role, rolePrivilegesScala);
- }
- return null;
+ return null;
+ }
+ });
+
+ return tSentryPrivileges;
+ }
+
+ private List<String> getAllRoles() {
+ final List<String> roles = new ArrayList<>();
+ execute(new Command<Void>() {
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ for (TSentryRole tSentryRole : client.listAllRoles(requestorName, COMPONENT_NAME)) {
+ roles.add(tSentryRole.getRoleName());
+ }
+ return null;
+ }
+ });
+
+ return roles;
+ }
+
+ private Map<Resource, scala.collection.immutable.Set<Acl>> getAclsForRoles(final List<String> roles) {
+ return scala.collection.JavaConverters.mapAsScalaMapConverter(
+ rolePrivilegesToResourceAcls(getRoleToPrivileges(roles)))
+ .asScala().toMap(Predef.<Tuple2<Resource, scala.collection.immutable.Set<Acl>>>conforms());
+ }
+
+ private java.util.Map<Resource, scala.collection.immutable.Set<Acl>> rolePrivilegesToResourceAcls(java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap) {
+ final java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap = new HashMap<>();
+ for (String role : rolePrivilegesMap.keySet()) {
+ scala.collection.immutable.Set<TSentryPrivilege> privileges = rolePrivilegesMap.get(role);
+ final Iterator<TSentryPrivilege> iterator = privileges.iterator();
+ while (iterator.hasNext()) {
+ TSentryPrivilege privilege = iterator.next();
+ final List<TAuthorizable> authorizables = privilege.getAuthorizables();
+ String host = null;
+ String operation = privilege.getAction();
+ for (TAuthorizable tAuthorizable : authorizables) {
+ if (tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name())) {
+ host = tAuthorizable.getName();
+ } else {
+ Resource resource = new Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()), tAuthorizable.getName());
+ if (operation.equals("*")) {
+ operation = "All";
}
- });
-
- return rolePrivilegesMap;
+ Acl acl = new Acl(new KafkaPrincipal("role", role), Allow$.MODULE$, host, Operation$.MODULE$.fromString(operation));
+ Set<Acl> newAclsJava = new HashSet<Acl>();
+ newAclsJava.add(acl);
+ addExistingAclsForResource(resourceAclsMap, resource, newAclsJava);
+ final scala.collection.mutable.Set<Acl> aclScala = JavaConversions.asScalaSet(newAclsJava);
+ resourceAclsMap.put(resource, aclScala.<Acl>toSet());
+ }
+ }
+ }
}
- private void addExistingAclsForResource(java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap, Resource resource, java.util.Set<Acl> newAclsJava) {
- final scala.collection.immutable.Set<Acl> existingAcls = resourceAclsMap.get(resource);
- if (existingAcls != null) {
- final Iterator<Acl> aclsIter = existingAcls.iterator();
- while (aclsIter.hasNext()) {
- Acl curAcl = aclsIter.next();
- newAclsJava.add(curAcl);
- }
+ return resourceAclsMap;
+ }
+
+ private java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> getRoleToPrivileges(final List<String> roles) {
+ final java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap = new HashMap<>();
+ execute(new Command<Void>() {
+ @Override
+ public Void run(SentryGenericServiceClient client) throws Exception {
+ for (String role : roles) {
+ final Set<TSentryPrivilege> rolePrivileges = client.listPrivilegesByRoleName(
+ requestorName, role, COMPONENT_NAME, instanceName);
+ final scala.collection.immutable.Set<TSentryPrivilege> rolePrivilegesScala =
+ scala.collection.JavaConverters.asScalaSetConverter(rolePrivileges).asScala().toSet();
+ rolePrivilegesMap.put(role, rolePrivilegesScala);
}
+ return null;
+ }
+ });
+
+ return rolePrivilegesMap;
+ }
+
+ private void addExistingAclsForResource(java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap, Resource resource, java.util.Set<Acl> newAclsJava) {
+ final scala.collection.immutable.Set<Acl> existingAcls = resourceAclsMap.get(resource);
+ if (existingAcls != null) {
+ final Iterator<Acl> aclsIter = existingAcls.iterator();
+ while (aclsIter.hasNext()) {
+ Acl curAcl = aclsIter.next();
+ newAclsJava.add(curAcl);
+ }
}
-
- private boolean roleExists(String role) {
- return getAllRoles().contains(role);
+ }
+
+ private boolean roleExists(String role) {
+ return getAllRoles().contains(role);
+ }
+
+ private void verifyAcls(scala.collection.immutable.Set<Acl> acls) {
+ final Iterator<Acl> iterator = acls.iterator();
+ while (iterator.hasNext()) {
+ final Acl acl = iterator.next();
+ assert acl.principal().getPrincipalType().toLowerCase().equals("role") : "Only Acls with KafkaPrincipal of type \"role;\" is supported.";
+ assert acl.permissionType().name().equals(Allow.name()) : "Only Acls with Permission of type \"Allow\" is supported.";
}
-
- private void verifyAcls(scala.collection.immutable.Set<Acl> acls) {
- final Iterator<Acl> iterator = acls.iterator();
- while (iterator.hasNext()) {
- final Acl acl = iterator.next();
- assert acl.principal().getPrincipalType().toLowerCase().equals("role") : "Only Acls with KafkaPrincipal of type \"role;\" is supported.";
- assert acl.permissionType().name().equals(Allow.name()) : "Only Acls with Permission of type \"Allow\" is supported.";
- }
+ }
+
+ /*
+ * For SSL session's Kafka creates user names with "CN=" prepended to the user name.
+ * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=".
+ * */
+ private String getName(RequestChannel.Session session) {
+ final String principalName = session.principal().getName();
+ int start = principalName.indexOf("CN=");
+ if (start >= 0) {
+ String tmpName, name = "";
+ tmpName = principalName.substring(start + 3);
+ int end = tmpName.indexOf(",");
+ if (end > 0) {
+ name = tmpName.substring(0, end);
+ } else {
+ name = tmpName;
+ }
+ return name;
+ } else {
+ return principalName;
}
-
- /*
- * For SSL session's Kafka creates user names with "CN=" prepended to the user name.
- * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=".
- * */
- private String getName(RequestChannel.Session session) {
- final String principalName = session.principal().getName();
- int start = principalName.indexOf("CN=");
- if (start >= 0) {
- String tmpName, name = "";
- tmpName = principalName.substring(start + 3);
- int end = tmpName.indexOf(",");
- if (end > 0) {
- name = tmpName.substring(0, end);
- } else {
- name = tmpName;
- }
- return name;
- } else {
- return principalName;
- }
+ }
+
+ /**
+ * Initialize kerberos via UserGroupInformation. Will only attempt to login
+ * during the first request, subsequent calls will have no effect.
+ */
+ private void initKerberos(String keytabFile, String principal) {
+ if (keytabFile == null || keytabFile.length() == 0) {
+ throw new IllegalArgumentException("keytabFile required because kerberos is enabled");
}
-
- /**
- * Initialize kerberos via UserGroupInformation. Will only attempt to login
- * during the first request, subsequent calls will have no effect.
- */
- private void initKerberos(String keytabFile, String principal) {
- if (keytabFile == null || keytabFile.length() == 0) {
- throw new IllegalArgumentException("keytabFile required because kerberos is enabled");
- }
- if (principal == null || principal.length() == 0) {
- throw new IllegalArgumentException("principal required because kerberos is enabled");
- }
- synchronized (KafkaAuthBinding.class) {
- if (kerberosInit == null) {
- kerberosInit = new Boolean(true);
- // let's avoid modifying the supplied configuration, just to be conservative
- final Configuration ugiConf = new Configuration();
- ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS);
- UserGroupInformation.setConfiguration(ugiConf);
- LOG.info(
- "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ",
- keytabFile, principal);
- try {
- UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to login user with Principal: " + principal +
- " and Keytab file: " + keytabFile, ioe);
- }
- LOG.info("Got Kerberos ticket");
- }
+ if (principal == null || principal.length() == 0) {
+ throw new IllegalArgumentException("principal required because kerberos is enabled");
+ }
+ synchronized (KafkaAuthBinding.class) {
+ if (kerberosInit == null) {
+ kerberosInit = new Boolean(true);
+ // let's avoid modifying the supplied configuration, just to be conservative
+ final Configuration ugiConf = new Configuration();
+ ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS);
+ UserGroupInformation.setConfiguration(ugiConf);
+ LOG.info(
+ "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ",
+ keytabFile, principal);
+ try {
+ UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to login user with Principal: " + principal +
+ " and Keytab file: " + keytabFile, ioe);
}
+ LOG.info("Got Kerberos ticket");
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
index 0a57e2e..2f4f8df 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
@@ -17,7 +17,7 @@ package org.apache.sentry.kafka.conf;
import java.net.URL;
import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine;
+import org.apache.sentry.policy.engine.common.CommonPolicyEngine;
import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend;
@@ -41,7 +41,7 @@ public class KafkaAuthConf extends Configuration {
AUTHZ_PROVIDER("sentry.kafka.provider", HadoopGroupResourceAuthorizationProvider.class.getName()),
AUTHZ_PROVIDER_RESOURCE("sentry.kafka.provider.resource", ""),
AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()),
- AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()),
+ AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", CommonPolicyEngine.class.getName()),
AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"),
AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka"),
AUTHZ_PRINCIPAL_HOSTNAME(KAFKA_PRINCIPAL_HOSTNAME, null),
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
new file mode 100644
index 0000000..086b707
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public abstract class AbstractTestKafkaPolicyEngine {
+
+ private static final String ADMIN = "host=*->action=all";
+ private static final String ADMIN_HOST1 = "host=host1->action=all";
+ private static final String CONSUMER_T1_ALL = "host=*->topic=t1->action=read";
+ private static final String CONSUMER_T1_HOST1 = "host=host1->topic=t1->action=read";
+ private static final String CONSUMER_T2_HOST2 = "host=host2->topic=t2->action=read";
+ private static final String PRODUCER_T1_ALL = "host=*->topic=t1->action=write";
+ private static final String PRODUCER_T1_HOST1 = "host=host1->topic=t1->action=write";
+ private static final String PRODUCER_T2_HOST2 = "host=host2->topic=t2->action=write";
+ private static final String CONSUMER_PRODUCER_T1 = "host=host1->topic=t1->action=all";
+
+ private PolicyEngine policy;
+ private static File baseDir;
+
+ @BeforeClass
+ public static void setupClazz() throws IOException {
+ baseDir = Files.createTempDir();
+ }
+
+ @AfterClass
+ public static void teardownClazz() throws IOException {
+ if (baseDir != null) {
+ FileUtils.deleteQuietly(baseDir);
+ }
+ }
+
+ protected void setPolicy(PolicyEngine policy) {
+ this.policy = policy;
+ }
+
+ protected static File getBaseDir() {
+ return baseDir;
+ }
+
+ @Before
+ public void setup() throws IOException {
+ afterSetup();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ beforeTeardown();
+ }
+
+ protected void afterSetup() throws IOException {}
+
+ protected void beforeTeardown() throws IOException {}
+
+
+ @Test
+ public void testConsumer0() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_ALL));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("consumer_group0"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testConsumer1() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_HOST1));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("consumer_group1"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testConsumer2() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T2_HOST2));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("consumer_group2"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testProducer0() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_ALL));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("producer_group0"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testProducer1() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_HOST1));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("producer_group1"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+
+ @Test
+ public void testProducer2() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T2_HOST2));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("producer_group2"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testConsumerProducer0() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_PRODUCER_T1));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("consumer_producer_group0"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testSubAdmin() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN_HOST1));
+ Assert.assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("subadmin_group"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ @Test
+ public void testAdmin() throws Exception {
+ Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN));
+ Assert
+ .assertEquals(expected.toString(),
+ new TreeSet<String>(policy.getPrivileges(set("admin_group"), ActiveRoleSet.ALL))
+ .toString());
+ }
+
+ private static Set<String> set(String... values) {
+ return Sets.newHashSet(values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java
new file mode 100644
index 0000000..1ededbd
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.policy.kafka;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.policy.engine.common.CommonPolicyEngine;
+import org.apache.sentry.provider.common.ProviderBackend;
+import org.apache.sentry.provider.common.ProviderBackendContext;
+import org.apache.sentry.provider.file.SimpleFileProviderBackend;
+
+import java.io.IOException;
+
+public class KafkaPolicyTestUtil {
+
+ public static PolicyEngine createPolicyEngineForTest(String resource) throws IOException {
+
+ ProviderBackend providerBackend = new SimpleFileProviderBackend(new Configuration(), resource);
+
+ // create backendContext
+ ProviderBackendContext context = new ProviderBackendContext();
+ context.setAllowPerDatabase(false);
+ context.setValidators(KafkaPrivilegeModel.getInstance().getPrivilegeValidators());
+ // initialize the backend with the context
+ providerBackend.initialize(context);
+
+
+ return new CommonPolicyEngine(providerBackend);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
new file mode 100644
index 0000000..572c74d
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.util.Set;
+
+import org.apache.sentry.provider.common.GroupMappingService;
+
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+public class MockGroupMappingServiceProvider implements GroupMappingService {
+ private final Multimap<String, String> userToGroupMap;
+
+ public MockGroupMappingServiceProvider(Multimap<String, String> userToGroupMap) {
+ this.userToGroupMap = userToGroupMap;
+ }
+ @Override
+ public Set<String> getGroups(String user) {
+ return Sets.newHashSet(userToGroupMap.get(user));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
new file mode 100644
index 0000000..07f4d7d
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.Action;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.Subject;
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.ConsumerGroup;
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
+import org.apache.sentry.provider.common.ResourceAuthorizationProvider;
+import org.apache.sentry.provider.file.PolicyFiles;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public class TestKafkaAuthorizationProviderGeneralCases {
+ private static final Multimap<String, String> USER_TO_GROUP_MAP = HashMultimap.create();
+
+ private static final Host HOST_1 = new Host("host1");
+ private static final Host HOST_2 = new Host("host2");
+ private static final Cluster cluster1 = new Cluster();
+ private static final Topic topic1 = new Topic("t1");
+ private static final Topic topic2 = new Topic("t2");
+ private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1");
+ private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2");
+
+ private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL);
+ private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ);
+ private static final KafkaAction WRITE = new KafkaAction(KafkaActionConstant.WRITE);
+ private static final KafkaAction CREATE = new KafkaAction(KafkaActionConstant.CREATE);
+ private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE);
+ private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER);
+ private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE);
+ private static final KafkaAction CLUSTER_ACTION = new KafkaAction(
+ KafkaActionConstant.CLUSTER_ACTION);
+
+ private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION);
+
+ private static final Subject ADMIN = new Subject("admin1");
+ private static final Subject SUB_ADMIN = new Subject("subadmin1");
+ private static final Subject CONSUMER0 = new Subject("consumer0");
+ private static final Subject CONSUMER1 = new Subject("consumer1");
+ private static final Subject CONSUMER2 = new Subject("consumer2");
+ private static final Subject PRODUCER0 = new Subject("producer0");
+ private static final Subject PRODUCER1 = new Subject("producer1");
+ private static final Subject PRODUCER2 = new Subject("producer2");
+ private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0");
+
+ private static final String ADMIN_GROUP = "admin_group";
+ private static final String SUBADMIN_GROUP = "subadmin_group";
+ private static final String CONSUMER_GROUP0 = "consumer_group0";
+ private static final String CONSUMER_GROUP1 = "consumer_group1";
+ private static final String CONSUMER_GROUP2 = "consumer_group2";
+ private static final String PRODUCER_GROUP0 = "producer_group0";
+ private static final String PRODUCER_GROUP1 = "producer_group1";
+ private static final String PRODUCER_GROUP2 = "producer_group2";
+ private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0";
+
+ static {
+ USER_TO_GROUP_MAP.putAll(ADMIN.getName(), Arrays.asList(ADMIN_GROUP));
+ USER_TO_GROUP_MAP.putAll(SUB_ADMIN.getName(), Arrays.asList(SUBADMIN_GROUP ));
+ USER_TO_GROUP_MAP.putAll(CONSUMER0.getName(), Arrays.asList(CONSUMER_GROUP0));
+ USER_TO_GROUP_MAP.putAll(CONSUMER1.getName(), Arrays.asList(CONSUMER_GROUP1));
+ USER_TO_GROUP_MAP.putAll(CONSUMER2.getName(), Arrays.asList(CONSUMER_GROUP2));
+ USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(), Arrays.asList(PRODUCER_GROUP0));
+ USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(), Arrays.asList(PRODUCER_GROUP1));
+ USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(), Arrays.asList(PRODUCER_GROUP2));
+ USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(), Arrays.asList(CONSUMER_PRODUCER_GROUP0));
+ }
+
+ private final ResourceAuthorizationProvider authzProvider;
+ private File baseDir;
+
+ public TestKafkaAuthorizationProviderGeneralCases() throws IOException {
+ baseDir = Files.createTempDir();
+ PolicyFiles.copyToDir(baseDir, "kafka-policy-test-authz-provider.ini");
+ authzProvider = new HadoopGroupResourceAuthorizationProvider(
+ KafkaPolicyTestUtil.createPolicyEngineForTest(new File(baseDir,
+ "kafka-policy-test-authz-provider.ini").getPath()),
+ new MockGroupMappingServiceProvider(USER_TO_GROUP_MAP), KafkaPrivilegeModel.getInstance());
+ }
+
+ @After
+ public void teardown() {
+ if(baseDir != null) {
+ FileUtils.deleteQuietly(baseDir);
+ }
+ }
+
+ private void doTestResourceAuthorizationProvider(Subject subject, List<? extends Authorizable> authorizableHierarchy,
+ Set<? extends Action> actions, boolean expected) throws Exception {
+ Objects.ToStringHelper helper = Objects.toStringHelper("TestParameters");
+ helper.add("Subject", subject).add("authzHierarchy", authorizableHierarchy).add("action", actions);
+ Assert.assertEquals(helper.toString(), expected,
+ authzProvider.hasAccess(subject, authorizableHierarchy, actions, ActiveRoleSet.ALL));
+ }
+
+ @Test
+ public void testAdmin() throws Exception {
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1), allActions, true);
+
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cluster1), allActions, false);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic1), allActions, false);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic2), allActions, false);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, false);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, false);
+ doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2), allActions, false);
+
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1), allActions, true);
+
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cluster1), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic1), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic2), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, true);
+ doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2), allActions, true);
+ }
+
+ @Test
+ public void testConsumer() throws Exception {
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(CONSUMER0, Arrays.asList(host, topic1),
+ Sets.newHashSet(action), READ.equals(action));
+ }
+ }
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1),
+ Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action));
+ }
+ }
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2),
+ Sets.newHashSet(action), HOST_2.equals(host) && READ.equals(action));
+ }
+ }
+ }
+
+ @Test
+ public void testProducer() throws Exception {
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(PRODUCER0, Arrays.asList(host, topic1),
+ Sets.newHashSet(action), WRITE.equals(action));
+ }
+ }
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1),
+ Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
+ }
+ }
+ for (KafkaAction action : allActions) {
+ for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
+ doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2),
+ Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
+ }
+ }
+ }
+
+ @Test
+ public void testConsumerProducer() throws Exception {
+ for (KafkaAction action : allActions) {
+ doTestResourceAuthorizationProvider(CONSUMER_PRODUCER0, Arrays.asList(HOST_1, topic1),
+ Sets.newHashSet(action), true);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java
new file mode 100644
index 0000000..63d2f30
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.core.common.Action;
+import org.apache.sentry.core.common.ActiveRoleSet;
+import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.Subject;
+import org.apache.sentry.core.model.kafka.KafkaActionConstant;
+import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.sentry.policy.common.PolicyEngine;
+import org.apache.sentry.provider.common.AuthorizationProvider;
+import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider;
+import org.apache.sentry.provider.file.PolicyFile;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+public class TestKafkaAuthorizationProviderSpecialCases {
+ private AuthorizationProvider authzProvider;
+ private PolicyFile policyFile;
+ private File baseDir;
+ private File iniFile;
+ private String initResource;
+ @Before
+ public void setup() throws IOException {
+ baseDir = Files.createTempDir();
+ iniFile = new File(baseDir, "policy.ini");
+ initResource = "file://" + iniFile.getPath();
+ policyFile = new PolicyFile();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ if(baseDir != null) {
+ FileUtils.deleteQuietly(baseDir);
+ }
+ }
+
+ @Test
+ public void testDuplicateEntries() throws Exception {
+ Subject user1 = new Subject("user1");
+ Host host1 = new Host("host1");
+ Topic topic1 = new Topic("t1");
+ Set<? extends Action> actions = Sets.newHashSet(new KafkaAction(KafkaActionConstant.READ));
+ policyFile.addGroupsToUser(user1.getName(), true, "group1", "group1")
+ .addRolesToGroup("group1", true, "role1", "role1")
+ .addPermissionsToRole("role1", true, "host=host1->topic=t1->action=read",
+ "host=host1->topic=t1->action=read");
+ policyFile.write(iniFile);
+ PolicyEngine policy = KafkaPolicyTestUtil.createPolicyEngineForTest(initResource);
+ authzProvider = new LocalGroupResourceAuthorizationProvider(initResource,
+ policy, KafkaPrivilegeModel.getInstance());
+ List<? extends Authorizable> authorizableHierarchy = ImmutableList.of(host1, topic1);
+ Assert.assertTrue(authorizableHierarchy.toString(),
+ authzProvider.hasAccess(user1, authorizableHierarchy, actions, ActiveRoleSet.ALL));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
new file mode 100644
index 0000000..62fbea7
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.fail;
+
+import org.apache.sentry.core.model.kafka.Cluster;
+import org.apache.sentry.core.model.kafka.ConsumerGroup;
+import org.apache.sentry.core.model.kafka.Host;
+import org.apache.sentry.core.model.kafka.KafkaModelAuthorizables;
+import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.shiro.config.ConfigurationException;
+import org.junit.Test;
+
+public class TestKafkaModelAuthorizables {
+
+ @Test
+ public void testHost() throws Exception {
+ Host host1 = (Host) KafkaModelAuthorizables.from("HOST=host1");
+ assertEquals("host1", host1.getName());
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testNoKV() throws Exception {
+ System.out.println(KafkaModelAuthorizables.from("nonsense"));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testEmptyKey() throws Exception {
+ System.out.println(KafkaModelAuthorizables.from("=host1"));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testEmptyValue() throws Exception {
+ System.out.println(KafkaModelAuthorizables.from("HOST="));
+ }
+
+ @Test
+ public void testNotAuthorizable() throws Exception {
+ assertNull(KafkaModelAuthorizables.from("k=v"));
+ }
+
+ @Test
+ public void testResourceNameIsCaseSensitive() throws Exception {
+ Host host1 = (Host)KafkaModelAuthorizables.from("HOST=Host1");
+ assertEquals("Host1", host1.getName());
+
+ Cluster cluster1 = (Cluster)KafkaModelAuthorizables.from("Cluster=kafka-cluster");
+ assertEquals("kafka-cluster", cluster1.getName());
+
+ Topic topic1 = (Topic)KafkaModelAuthorizables.from("topic=topiC1");
+ assertEquals("topiC1", topic1.getName());
+
+ ConsumerGroup consumergroup1 = (ConsumerGroup)KafkaModelAuthorizables.from("ConsumerGroup=CG1");
+ assertEquals("CG1", consumergroup1.getName());
+ }
+
+ @Test
+ public void testClusterResourceNameIsRestricted() throws Exception {
+ try {
+ KafkaModelAuthorizables.from("Cluster=cluster1");
+ fail("Cluster with name other than " + Cluster.NAME + " must not have been created.");
+ } catch (ConfigurationException cex) {
+ assertEquals("Exception message is not as expected.", "Kafka's cluster resource can only have name " + Cluster.NAME, cex.getMessage());
+ } catch (Exception ex) {
+ fail("Configuration exception was expected for invalid Cluster name.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java
new file mode 100644
index 0000000..4299b1f
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.sentry.provider.file.PolicyFiles;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestKafkaPolicyEngineDFS extends AbstractTestKafkaPolicyEngine {
+ private static MiniDFSCluster dfsCluster;
+ private static FileSystem fileSystem;
+ private static Path root;
+ private static Path etc;
+
+ @BeforeClass
+ public static void setupLocalClazz() throws IOException {
+ File baseDir = getBaseDir();
+ Assert.assertNotNull(baseDir);
+ File dfsDir = new File(baseDir, "dfs");
+ Assert.assertTrue(dfsDir.isDirectory() || dfsDir.mkdirs());
+ Configuration conf = new Configuration();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath());
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+ fileSystem = dfsCluster.getFileSystem();
+ root = new Path(fileSystem.getUri().toString());
+ etc = new Path(root, "/etc");
+ fileSystem.mkdirs(etc);
+ }
+
+ @AfterClass
+ public static void teardownLocalClazz() {
+ if(dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+
+ @Override
+ protected void afterSetup() throws IOException {
+ fileSystem.delete(etc, true);
+ fileSystem.mkdirs(etc);
+ PolicyFiles.copyToDir(fileSystem, etc, "kafka-policy-test-authz-provider.ini");
+ setPolicy(KafkaPolicyTestUtil.createPolicyEngineForTest(new Path(etc,
+ "kafka-policy-test-authz-provider.ini").toString()));
+ }
+
+ @Override
+ protected void beforeTeardown() throws IOException {
+ fileSystem.delete(etc, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java
new file mode 100644
index 0000000..9a69f1c
--- /dev/null
+++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sentry.policy.kafka;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.provider.file.PolicyFiles;
+
+public class TestKafkaPolicyEngineLocalFS extends AbstractTestKafkaPolicyEngine {
+
+ @Override
+ protected void afterSetup() throws IOException {
+ File baseDir = getBaseDir();
+ Assert.assertNotNull(baseDir);
+ Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
+ PolicyFiles.copyToDir(baseDir, "kafka-policy-test-authz-provider.ini");
+ setPolicy(KafkaPolicyTestUtil.createPolicyEngineForTest(new File(baseDir,
+ "kafka-policy-test-authz-provider.ini").getPath()));
+ }
+
+ @Override
+ protected void beforeTeardown() throws IOException {
+ File baseDir = getBaseDir();
+ Assert.assertNotNull(baseDir);
+ FileUtils.deleteQuietly(baseDir);
+ }
+}