You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2016/04/16 05:21:02 UTC
[1/2] sentry git commit: SENTRY-1162: Add shell for Sentry Kafka
integration (Ashish K Singh, Reviewed by:Hao Hao)
Repository: sentry
Updated Branches:
refs/heads/master 65f5ffe45 -> 6d79016aa
SENTRY-1162: Add shell for Sentry Kafka integration (Ashish K Singh, Reviewed by:Hao Hao)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/70d0ecce
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/70d0ecce
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/70d0ecce
Branch: refs/heads/master
Commit: 70d0eccef8b25d9db16001f9e4200aecca5542a5
Parents: 65f5ffe
Author: hahao <ha...@cloudera.com>
Authored: Fri Apr 15 17:30:31 2016 -0700
Committer: hahao <ha...@cloudera.com>
Committed: Fri Apr 15 20:15:57 2016 -0700
----------------------------------------------------------------------
sentry-provider/sentry-provider-db/pom.xml | 4 +
.../tools/KafkaTSentryPrivilegeConvertor.java | 109 ++++
.../db/generic/tools/SentryShellKafka.java | 112 ++++
.../db/generic/tools/TestSentryShellKafka.java | 540 +++++++++++++++++++
4 files changed, 765 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/70d0ecce/sentry-provider/sentry-provider-db/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/pom.xml b/sentry-provider/sentry-provider-db/pom.xml
index 205ffe6..eb9de88 100644
--- a/sentry-provider/sentry-provider-db/pom.xml
+++ b/sentry-provider/sentry-provider-db/pom.xml
@@ -107,6 +107,10 @@ limitations under the License.
<artifactId>sentry-policy-search</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.sentry</groupId>
+ <artifactId>sentry-policy-kafka</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/sentry/blob/70d0ecce/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/KafkaTSentryPrivilegeConvertor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/KafkaTSentryPrivilegeConvertor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/KafkaTSentryPrivilegeConvertor.java
new file mode 100644
index 0000000..ca88c25
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/KafkaTSentryPrivilegeConvertor.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.generic.tools;
+
+import com.google.common.collect.Lists;
+import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
+import org.apache.sentry.policy.common.KeyValue;
+import org.apache.sentry.policy.common.PolicyConstants;
+import org.apache.sentry.policy.common.PrivilegeValidatorContext;
+import org.apache.sentry.policy.kafka.KafkaModelAuthorizables;
+import org.apache.sentry.policy.kafka.KafkaPrivilegeValidator;
+import org.apache.sentry.provider.common.PolicyFileConstants;
+import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
+import org.apache.sentry.provider.db.generic.service.thrift.TSentryGrantOption;
+import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
+import org.apache.sentry.provider.db.generic.tools.command.TSentryPrivilegeConvertor;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class KafkaTSentryPrivilegeConvertor implements TSentryPrivilegeConvertor {
+ private String component;
+ private String service;
+
+ public KafkaTSentryPrivilegeConvertor(String component, String service) {
+ this.component = component;
+ this.service = service;
+ }
+
+ public TSentryPrivilege fromString(String privilegeStr) throws Exception {
+ validatePrivilegeHierarchy(privilegeStr);
+ TSentryPrivilege tSentryPrivilege = new TSentryPrivilege();
+ List<TAuthorizable> authorizables = new LinkedList<TAuthorizable>();
+ for (String authorizable : PolicyConstants.AUTHORIZABLE_SPLITTER.split(privilegeStr)) {
+ KeyValue keyValue = new KeyValue(authorizable);
+ String key = keyValue.getKey();
+ String value = keyValue.getValue();
+
+ // is it an authorizable?
+ KafkaAuthorizable authz = KafkaModelAuthorizables.from(keyValue);
+ if (authz != null) {
+ authorizables.add(new TAuthorizable(authz.getTypeName(), authz.getName()));
+
+ } else if (PolicyFileConstants.PRIVILEGE_ACTION_NAME.equalsIgnoreCase(key)) {
+ tSentryPrivilege.setAction(value);
+ }
+ }
+
+ if (tSentryPrivilege.getAction() == null) {
+ throw new IllegalArgumentException("Privilege is invalid: action required but not specified.");
+ }
+ tSentryPrivilege.setComponent(component);
+ tSentryPrivilege.setServiceName(service);
+ tSentryPrivilege.setAuthorizables(authorizables);
+ return tSentryPrivilege;
+ }
+
+ public String toString(TSentryPrivilege tSentryPrivilege) {
+ List<String> privileges = Lists.newArrayList();
+ if (tSentryPrivilege != null) {
+ List<TAuthorizable> authorizables = tSentryPrivilege.getAuthorizables();
+ String action = tSentryPrivilege.getAction();
+ String grantOption = (tSentryPrivilege.getGrantOption() == TSentryGrantOption.TRUE ? "true"
+ : "false");
+
+ Iterator<TAuthorizable> it = authorizables.iterator();
+ if (it != null) {
+ while (it.hasNext()) {
+ TAuthorizable tAuthorizable = it.next();
+ privileges.add(PolicyConstants.KV_JOINER.join(
+ tAuthorizable.getType(), tAuthorizable.getName()));
+ }
+ }
+
+ if (!authorizables.isEmpty()) {
+ privileges.add(PolicyConstants.KV_JOINER.join(
+ PolicyFileConstants.PRIVILEGE_ACTION_NAME, action));
+ }
+
+ // only append the grant option to privilege string if it's true
+ if ("true".equals(grantOption)) {
+ privileges.add(PolicyConstants.KV_JOINER.join(
+ PolicyFileConstants.PRIVILEGE_GRANT_OPTION_NAME, grantOption));
+ }
+ }
+ return PolicyConstants.AUTHORIZABLE_JOINER.join(privileges);
+ }
+
+ private static void validatePrivilegeHierarchy(String privilegeStr) throws Exception {
+ new KafkaPrivilegeValidator().validate(new PrivilegeValidatorContext(privilegeStr));
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/70d0ecce/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java
new file mode 100644
index 0000000..e15d8d2
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellKafka.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.generic.tools;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory;
+import org.apache.sentry.provider.db.generic.tools.command.*;
+import org.apache.sentry.provider.db.tools.SentryShellCommon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SentryShellKafka is an admin tool, and responsible for the management of repository.
+ * The following commands are supported:
+ * create role, drop role, add group to role, grant privilege to role,
+ * revoke privilege from role, list roles, list privilege for role.
+ */
+public class SentryShellKafka extends SentryShellCommon {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryShellKafka.class);
+ public static final String KAFKA_SERVICE_NAME = "sentry.service.client.kafka.service.name";
+
+ @Override
+ public void run() throws Exception {
+ Command command = null;
+ String component = "KAFKA";
+ Configuration conf = getSentryConf();
+
+ String service = conf.get(KAFKA_SERVICE_NAME, "kafka1");
+ SentryGenericServiceClient client = SentryGenericServiceClientFactory.create(conf);
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ String requestorName = ugi.getShortUserName();
+
+ if (isCreateRole) {
+ command = new CreateRoleCmd(roleName, component);
+ } else if (isDropRole) {
+ command = new DropRoleCmd(roleName, component);
+ } else if (isAddRoleGroup) {
+ command = new AddRoleToGroupCmd(roleName, groupName, component);
+ } else if (isDeleteRoleGroup) {
+ command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
+ } else if (isGrantPrivilegeRole) {
+ command = new GrantPrivilegeToRoleCmd(roleName, component,
+ privilegeStr, new KafkaTSentryPrivilegeConvertor(component, service));
+ } else if (isRevokePrivilegeRole) {
+ command = new RevokePrivilegeFromRoleCmd(roleName, component,
+ privilegeStr, new KafkaTSentryPrivilegeConvertor(component, service));
+ } else if (isListRole) {
+ command = new ListRolesCmd(groupName, component);
+ } else if (isListPrivilege) {
+ command = new ListPrivilegesByRoleCmd(roleName, component,
+ service, new KafkaTSentryPrivilegeConvertor(component, service));
+ }
+
+ // check the requestor name
+ if (StringUtils.isEmpty(requestorName)) {
+ // The exception message will be recorded in log file.
+ throw new Exception("The requestor name is empty.");
+ }
+
+ if (command != null) {
+ command.execute(client, requestorName);
+ }
+ }
+
+ private Configuration getSentryConf() {
+ Configuration conf = new Configuration();
+ conf.addResource(new Path(confPath));
+ return conf;
+ }
+
+ public static void main(String[] args) throws Exception {
+ SentryShellKafka sentryShell = new SentryShellKafka();
+ try {
+ sentryShell.executeShell(args);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ Throwable current = e;
+ // find the first printable message;
+ while (current != null && current.getMessage() == null) {
+ current = current.getCause();
+ }
+ String error = "";
+ if (current != null && current.getMessage() != null) {
+ error = "Message: " + current.getMessage();
+ }
+ System.out.println("The operation failed. " + error);
+ System.exit(1);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/70d0ecce/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/tools/TestSentryShellKafka.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/tools/TestSentryShellKafka.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/tools/TestSentryShellKafka.java
new file mode 100644
index 0000000..7d25ae1
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/tools/TestSentryShellKafka.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.generic.tools;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.sentry.SentryUserException;
+import org.apache.sentry.policy.kafka.KafkaPrivilegeValidator;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceIntegrationBase;
+import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
+import org.apache.sentry.provider.db.generic.service.thrift.TSentryRole;
+import org.apache.sentry.provider.db.tools.SentryShellCommon;
+import org.apache.shiro.config.ConfigurationException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class TestSentryShellKafka extends SentryGenericServiceIntegrationBase {
+ private File confDir;
+ private File confPath;
+ private static String TEST_ROLE_NAME_1 = "testRole1";
+ private static String TEST_ROLE_NAME_2 = "testRole2";
+ private static String KAFKA = "KAFKA";
+ private String requestorName = "";
+ private String service = "kafka1";
+
+ @Before
+ public void prepareForTest() throws Exception {
+ confDir = Files.createTempDir();
+ confPath = new File(confDir, "sentry-site.xml");
+ if (confPath.createNewFile()) {
+ FileOutputStream to = new FileOutputStream(confPath);
+ conf.writeXml(to);
+ to.close();
+ }
+ requestorName = System.getProperty("user.name", "");
+ Set<String> requestorUserGroupNames = Sets.newHashSet(ADMIN_GROUP);
+ setLocalGroupMapping(requestorName, requestorUserGroupNames);
+ // add ADMIN_USER for the after() in SentryServiceIntegrationBase
+ setLocalGroupMapping(ADMIN_USER, requestorUserGroupNames);
+ writePolicyFile();
+ }
+
+ @After
+ public void clearTestData() throws Exception {
+ FileUtils.deleteQuietly(confDir);
+ }
+
+ @Test
+ public void testCreateDropRole() throws Exception {
+ runTestAsSubject(new TestOperation() {
+ @Override
+ public void runTestAsSubject() throws Exception {
+ // test: create role with -cr
+ String[] args = { "-cr", "-r", TEST_ROLE_NAME_1, "-conf", confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+ // test: create role with --create_role
+ args = new String[] { "--create_role", "-r", TEST_ROLE_NAME_2, "-conf",
+ confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+
+ // validate the result, list roles with -lr
+ args = new String[] { "-lr", "-conf", confPath.getAbsolutePath() };
+ SentryShellKafka sentryShell = new SentryShellKafka();
+ Set<String> roleNames = getShellResultWithOSRedirect(sentryShell, args, true);
+ validateRoleNames(roleNames, TEST_ROLE_NAME_1, TEST_ROLE_NAME_2);
+
+ // validate the result, list roles with --list_role
+ args = new String[] { "--list_role", "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ roleNames = getShellResultWithOSRedirect(sentryShell, args, true);
+ validateRoleNames(roleNames, TEST_ROLE_NAME_1, TEST_ROLE_NAME_2);
+
+ // test: drop role with -dr
+ args = new String[] { "-dr", "-r", TEST_ROLE_NAME_1, "-conf", confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+ // test: drop role with --drop_role
+ args = new String[] { "--drop_role", "-r", TEST_ROLE_NAME_2, "-conf",
+ confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+
+ // validate the result
+ Set<TSentryRole> roles = client.listAllRoles(requestorName, KAFKA);
+ assertEquals("Incorrect number of roles", 0, roles.size());
+ }
+ });
+ }
+
+ @Test
+ public void testAddDeleteRoleForGroup() throws Exception {
+ runTestAsSubject(new TestOperation() {
+ @Override
+ public void runTestAsSubject() throws Exception {
+ // Group names are case sensitive - mixed case names should work
+ String TEST_GROUP_1 = "testGroup1";
+ String TEST_GROUP_2 = "testGroup2";
+ String TEST_GROUP_3 = "testGroup3";
+
+ // create the role for test
+ client.createRole(requestorName, TEST_ROLE_NAME_1, KAFKA);
+ client.createRole(requestorName, TEST_ROLE_NAME_2, KAFKA);
+ // test: add role to group with -arg
+ String[] args = { "-arg", "-r", TEST_ROLE_NAME_1, "-g", TEST_GROUP_1, "-conf",
+ confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+ // test: add role to multiple groups
+ args = new String[] { "-arg", "-r", TEST_ROLE_NAME_1, "-g", TEST_GROUP_2 + "," + TEST_GROUP_3,
+ "-conf",
+ confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+ // test: add role to group with --add_role_group
+ args = new String[] { "--add_role_group", "-r", TEST_ROLE_NAME_2, "-g", TEST_GROUP_1,
+ "-conf",
+ confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+
+ // validate the result list roles with -lr and -g
+ args = new String[] { "-lr", "-g", TEST_GROUP_1, "-conf", confPath.getAbsolutePath() };
+ SentryShellKafka sentryShell = new SentryShellKafka();
+ Set<String> roleNames = getShellResultWithOSRedirect(sentryShell, args, true);
+ validateRoleNames(roleNames, TEST_ROLE_NAME_1, TEST_ROLE_NAME_2);
+
+ // list roles with --list_role and -g
+ args = new String[] { "--list_role", "-g", TEST_GROUP_2, "-conf",
+ confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ roleNames = getShellResultWithOSRedirect(sentryShell, args, true);
+ validateRoleNames(roleNames, TEST_ROLE_NAME_1);
+
+ args = new String[] { "--list_role", "-g", TEST_GROUP_3, "-conf",
+ confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ roleNames = getShellResultWithOSRedirect(sentryShell, args, true);
+ validateRoleNames(roleNames, TEST_ROLE_NAME_1);
+
+ // test: delete role from group with -drg
+ args = new String[] { "-drg", "-r", TEST_ROLE_NAME_1, "-g", TEST_GROUP_1, "-conf",
+ confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+ // test: delete role to multiple groups
+ args = new String[] { "-drg", "-r", TEST_ROLE_NAME_1, "-g", TEST_GROUP_2 + "," + TEST_GROUP_3,
+ "-conf",
+ confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+ // test: delete role from group with --delete_role_group
+ args = new String[] { "--delete_role_group", "-r", TEST_ROLE_NAME_2, "-g", TEST_GROUP_1,
+ "-conf", confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+
+ // validate the result
+ Set<TSentryRole> roles = client.listRolesByGroupName(requestorName, TEST_GROUP_1, KAFKA);
+ assertEquals("Incorrect number of roles", 0, roles.size());
+ roles = client.listRolesByGroupName(requestorName, TEST_GROUP_2, KAFKA);
+ assertEquals("Incorrect number of roles", 0, roles.size());
+ roles = client.listRolesByGroupName(requestorName, TEST_GROUP_3, KAFKA);
+ assertEquals("Incorrect number of roles", 0, roles.size());
+ // clear the test data
+ client.dropRole(requestorName, TEST_ROLE_NAME_1, KAFKA);
+ client.dropRole(requestorName, TEST_ROLE_NAME_2, KAFKA);
+ }
+ });
+ }
+
+ @Test
+ public void testCaseSensitiveGroupName() throws Exception {
+ runTestAsSubject(new TestOperation() {
+ @Override
+ public void runTestAsSubject() throws Exception {
+
+ // create the role for test
+ client.createRole(requestorName, TEST_ROLE_NAME_1, KAFKA);
+ // add role to a group (lower case)
+ String[] args = {"-arg", "-r", TEST_ROLE_NAME_1, "-g", "group1", "-conf",
+ confPath.getAbsolutePath()};
+ SentryShellKafka.main(args);
+
+ // validate the roles when group name is same case as above
+ args = new String[]{"-lr", "-g", "group1", "-conf", confPath.getAbsolutePath()};
+ SentryShellKafka sentryShell = new SentryShellKafka();
+ Set<String> roleNames = getShellResultWithOSRedirect(sentryShell, args, true);
+ validateRoleNames(roleNames, TEST_ROLE_NAME_1);
+
+ // roles should be empty when group name is different case than above
+ args = new String[]{"-lr", "-g", "GROUP1", "-conf", confPath.getAbsolutePath()};
+ roleNames = getShellResultWithOSRedirect(sentryShell, args, true);
+ validateRoleNames(roleNames);
+ }
+ });
+ }
+
+ public static String grant(boolean shortOption) {
+ return shortOption ? "-gpr" : "--grant_privilege_role";
+ }
+
+ public static String revoke(boolean shortOption) {
+ return shortOption ? "-rpr" : "--revoke_privilege_role";
+ }
+
+ public static String list(boolean shortOption) {
+ return shortOption ? "-lp" : "--list_privilege";
+ }
+
+ private void assertGrantRevokePrivilege(final boolean shortOption) throws Exception {
+ runTestAsSubject(new TestOperation() {
+ @Override
+ public void runTestAsSubject() throws Exception {
+ // create the role for test
+ client.createRole(requestorName, TEST_ROLE_NAME_1, KAFKA);
+ client.createRole(requestorName, TEST_ROLE_NAME_2, KAFKA);
+
+ String [] privs = {
+ "HOST=*->CLUSTER=kafka-cluster->action=read",
+ "HOST=h1->TOPIC=t1->action=write",
+ "HOST=*->CONSUMERGROUP=cg1->action=read",
+ };
+ for (int i = 0; i < privs.length; ++i) {
+ // test: grant privilege to role
+ String [] args = new String [] { grant(shortOption), "-r", TEST_ROLE_NAME_1, "-p",
+ privs[ i ],
+ "-conf", confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+ }
+
+ // test the list privilege
+ String [] args = new String[] { list(shortOption), "-r", TEST_ROLE_NAME_1, "-conf", confPath.getAbsolutePath() };
+ SentryShellKafka sentryShell = new SentryShellKafka();
+ Set<String> privilegeStrs = getShellResultWithOSRedirect(sentryShell, args, true);
+
+ assertEquals("Incorrect number of privileges", privs.length, privilegeStrs.size());
+ for (int i = 0; i < privs.length; ++i) {
+ assertTrue("Expected privilege: " + privs[i] + " in " + Arrays.toString(privilegeStrs.toArray()), privilegeStrs.contains(privs[i]));
+ }
+
+ for (int i = 0; i < privs.length; ++i) {
+ args = new String[] { revoke(shortOption), "-r", TEST_ROLE_NAME_1, "-p",
+ privs[ i ], "-conf",
+ confPath.getAbsolutePath() };
+ SentryShellKafka.main(args);
+ Set<TSentryPrivilege> privileges = client.listPrivilegesByRoleName(requestorName,
+ TEST_ROLE_NAME_1, KAFKA, service);
+ assertEquals("Incorrect number of privileges. Received privileges: " + Arrays.toString(privileges.toArray()), privs.length - (i + 1), privileges.size());
+ }
+
+ // clear the test data
+ client.dropRole(requestorName, TEST_ROLE_NAME_1, KAFKA);
+ client.dropRole(requestorName, TEST_ROLE_NAME_2, KAFKA);
+ }
+ });
+ }
+
+
+ @Test
+ public void testGrantRevokePrivilegeWithShortOption() throws Exception {
+ assertGrantRevokePrivilege(true);
+ }
+
+ @Test
+ public void testGrantRevokePrivilegeWithLongOption() throws Exception {
+ assertGrantRevokePrivilege(false);
+ }
+
+
+ @Test
+ public void testNegativeCaseWithInvalidArgument() throws Exception {
+ runTestAsSubject(new TestOperation() {
+ @Override
+ public void runTestAsSubject() throws Exception {
+ client.createRole(requestorName, TEST_ROLE_NAME_1, KAFKA);
+ // test: create duplicate role with -cr
+ String[] args = { "-cr", "-r", TEST_ROLE_NAME_1, "-conf", confPath.getAbsolutePath() };
+ SentryShellKafka sentryShell = new SentryShellKafka();
+ try {
+ sentryShell.executeShell(args);
+ fail("Exception should be thrown for creating duplicate role");
+ } catch (SentryUserException e) {
+ // expected exception
+ } catch (Exception e) {
+ fail ("Unexpected exception received. " + e);
+ }
+
+ // test: drop non-exist role with -dr
+ args = new String[] { "-dr", "-r", TEST_ROLE_NAME_2, "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ try {
+ sentryShell.executeShell(args);
+ fail("Exception should be thrown for dropping non-exist role");
+ } catch (SentryUserException e) {
+ // excepted exception
+ } catch (Exception e) {
+ fail ("Unexpected exception received. " + e);
+ }
+
+ // test: add non-exist role to group with -arg
+ args = new String[] { "-arg", "-r", TEST_ROLE_NAME_2, "-g", "testGroup1", "-conf",
+ confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ try {
+ sentryShell.executeShell(args);
+ fail("Exception should be thrown for granting non-exist role to group");
+ } catch (SentryUserException e) {
+ // excepted exception
+ } catch (Exception e) {
+ fail ("Unexpected exception received. " + e);
+ }
+
+ // test: drop group from non-exist role with -drg
+ args = new String[] { "-drg", "-r", TEST_ROLE_NAME_2, "-g", "testGroup1", "-conf",
+ confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ try {
+ sentryShell.executeShell(args);
+ fail("Exception should be thrown for drop group from non-exist role");
+ } catch (SentryUserException e) {
+ // excepted exception
+ } catch (Exception e) {
+ fail ("Unexpected exception received. " + e);
+ }
+
+ // test: grant privilege to role with the error privilege format
+ args = new String[] { "-gpr", "-r", TEST_ROLE_NAME_1, "-p", "serverserver1->action=all",
+ "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ try {
+ sentryShell.executeShell(args);
+ fail("Exception should be thrown for the error privilege format, invalid key value.");
+ } catch (IllegalArgumentException e) {
+ // excepted exception
+ } catch (Exception e) {
+ fail ("Unexpected exception received. " + e);
+ }
+
+ // test: grant privilege to role with the error privilege hierarchy
+ args = new String[] { "-gpr", "-r", TEST_ROLE_NAME_1, "-p",
+ "consumergroup=cg1->host=h1->action=create", "-conf",
+ confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ try {
+ sentryShell.executeShell(args);
+ fail("Exception should be thrown for the error privilege format, invalid key value.");
+ } catch (ConfigurationException e) {
+ // expected exception
+ } catch (Exception e) {
+ fail ("Unexpected exception received. " + e);
+ }
+
+ // clear the test data
+ client.dropRole(requestorName, TEST_ROLE_NAME_1, KAFKA);
+ }
+ });
+ }
+
+ @Test
+ public void testNegativeCaseWithoutRequiredArgument() throws Exception {
+ runTestAsSubject(new TestOperation() {
+ @Override
+ public void runTestAsSubject() throws Exception {
+ String strOptionConf = "conf";
+ client.createRole(requestorName, TEST_ROLE_NAME_1, KAFKA);
+ // test: the conf is required argument
+ String[] args = { "-cr", "-r", TEST_ROLE_NAME_1 };
+ SentryShellKafka sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + strOptionConf);
+
+ // test: -r is required when create role
+ args = new String[] { "-cr", "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_ROLE_NAME);
+
+ // test: -r is required when drop role
+ args = new String[] { "-dr", "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_ROLE_NAME);
+
+ // test: -r is required when add role to group
+ args = new String[] { "-arg", "-g", "testGroup1", "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_ROLE_NAME);
+
+ // test: -g is required when add role to group
+ args = new String[] { "-arg", "-r", TEST_ROLE_NAME_2, "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_GROUP_NAME);
+
+ // test: -r is required when delete role from group
+ args = new String[] { "-drg", "-g", "testGroup1", "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_ROLE_NAME);
+
+ // test: -g is required when delete role from group
+ args = new String[] { "-drg", "-r", TEST_ROLE_NAME_2, "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_GROUP_NAME);
+
+ // test: -r is required when grant privilege to role
+ args = new String[] { "-gpr", "-p", "server=server1", "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_ROLE_NAME);
+
+ // test: -p is required when grant privilege to role
+ args = new String[] { "-gpr", "-r", TEST_ROLE_NAME_1, "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_PRIVILEGE);
+
+ // test: action is required in privilege
+ args = new String[] { "-gpr", "-r", TEST_ROLE_NAME_1, "-conf", confPath.getAbsolutePath(), "-p", "host=*->topic=t1" };
+ sentryShell = new SentryShellKafka();
+ try {
+ getShellResultWithOSRedirect(sentryShell, args, false);
+ fail("Expected IllegalArgumentException");
+ } catch (ConfigurationException e) {
+ assert(("Kafka privilege must end with a valid action.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg).equals(e.getMessage()));
+ } catch (Exception e) {
+ fail ("Unexpected exception received. " + e);
+ }
+
+ // test: -r is required when revoke privilege from role
+ args = new String[] { "-rpr", "-p", "host=h1", "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_ROLE_NAME);
+
+ // test: -p is required when revoke privilege from role
+ args = new String[] { "-rpr", "-r", TEST_ROLE_NAME_1, "-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsg(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + SentryShellCommon.OPTION_DESC_PRIVILEGE);
+
+ // test: command option is required for shell
+ args = new String[] {"-conf", confPath.getAbsolutePath() };
+ sentryShell = new SentryShellKafka();
+ validateMissingParameterMsgsContains(sentryShell, args,
+ SentryShellCommon.PREFIX_MESSAGE_MISSING_OPTION + "[",
+ "-arg Add role to group",
+ "-cr Create role",
+ "-rpr Revoke privilege from role",
+ "-drg Delete role from group",
+ "-lr List role",
+ "-lp List privilege",
+ "-gpr Grant privilege to role",
+ "-dr Drop role");
+
+ // clear the test data
+ client.dropRole(requestorName, TEST_ROLE_NAME_1, KAFKA);
+ }
+ });
+ }
+
+ // redirect the System.out to ByteArrayOutputStream, then execute the command and parse the result.
+ private Set<String> getShellResultWithOSRedirect(SentryShellKafka sentryShell,
+ String[] args, boolean expectedExecuteResult) throws Exception {
+ PrintStream oldOut = System.out;
+ ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(outContent));
+ assertEquals(expectedExecuteResult, sentryShell.executeShell(args));
+ Set<String> resultSet = Sets.newHashSet(outContent.toString().split("\n"));
+ System.setOut(oldOut);
+ return resultSet;
+ }
+
+ private void validateRoleNames(Set<String> roleNames, String ... expectedRoleNames) {
+ if (expectedRoleNames != null && expectedRoleNames.length > 0) {
+ assertEquals("Found: " + roleNames.size() + " roles, expected: " + expectedRoleNames.length,
+ expectedRoleNames.length, roleNames.size());
+ Set<String> lowerCaseRoles = new HashSet<String>();
+ for (String role : roleNames) {
+ lowerCaseRoles.add(role.toLowerCase());
+ }
+
+ for (String expectedRole : expectedRoleNames) {
+ assertTrue("Expected role: " + expectedRole,
+ lowerCaseRoles.contains(expectedRole.toLowerCase()));
+ }
+ }
+ }
+
+ private void validateMissingParameterMsg(SentryShellKafka sentryShell, String[] args,
+ String expectedErrorMsg) throws Exception {
+ Set<String> errorMsgs = getShellResultWithOSRedirect(sentryShell, args, false);
+ assertTrue("Expected error message: " + expectedErrorMsg, errorMsgs.contains(expectedErrorMsg));
+ }
+
+ private void validateMissingParameterMsgsContains(SentryShellKafka sentryShell, String[] args,
+ String ... expectedErrorMsgsContains) throws Exception {
+ Set<String> errorMsgs = getShellResultWithOSRedirect(sentryShell, args, false);
+ boolean foundAllMessages = false;
+ Iterator<String> it = errorMsgs.iterator();
+ while (it.hasNext()) {
+ String errorMessage = it.next();
+ boolean missingExpected = false;
+ for (String expectedContains : expectedErrorMsgsContains) {
+ if (!errorMessage.contains(expectedContains)) {
+ missingExpected = true;
+ break;
+ }
+ }
+ if (!missingExpected) {
+ foundAllMessages = true;
+ break;
+ }
+ }
+ assertTrue(foundAllMessages);
+ }
+}
[2/2] sentry git commit: SENTRY-1188: Fixes to get kerberos auth
work. (Ashish K Singh, Reviewed by: Hao Hao)
Posted by ha...@apache.org.
SENTRY-1188: Fixes to get kerberos auth work. (Ashish K Singh, Reviewed by: Hao Hao)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/6d79016a
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/6d79016a
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/6d79016a
Branch: refs/heads/master
Commit: 6d79016aaf2c9179bea9171c393990a466248753
Parents: 70d0ecc
Author: hahao <ha...@cloudera.com>
Authored: Fri Apr 15 17:35:19 2016 -0700
Committer: hahao <ha...@cloudera.com>
Committed: Fri Apr 15 20:20:39 2016 -0700
----------------------------------------------------------------------
.../kafka/authorizer/SentryKafkaAuthorizer.java | 2 +-
.../sentry/kafka/binding/KafkaAuthBinding.java | 66 +++++++++++++++++++-
.../binding/KafkaAuthBindingSingleton.java | 5 +-
.../apache/sentry/kafka/conf/KafkaAuthConf.java | 8 ++-
.../policy/kafka/KafkaWildcardPrivilege.java | 2 +-
5 files changed, 77 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
index 3bce6cc..03f7b7f 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
@@ -117,7 +117,7 @@ public class SentryKafkaAuthorizer implements Authorizer {
}
LOG.info("Configuring Sentry KafkaAuthorizer: " + sentry_site);
final KafkaAuthBindingSingleton instance = KafkaAuthBindingSingleton.getInstance();
- instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site);
+ instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site, configs);
this.binding = instance.getAuthBinding();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index 8f4a8c4..c6600a0 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -16,6 +16,7 @@
*/
package org.apache.sentry.kafka.binding;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,6 +35,8 @@ import com.google.common.collect.Sets;
import kafka.network.RequestChannel;
import kafka.security.auth.Operation;
import kafka.security.auth.Resource;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.sentry.SentryUserException;
@@ -55,6 +58,7 @@ import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericService
import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
import org.apache.sentry.provider.db.generic.service.thrift.TSentryRole;
+import org.apache.sentry.service.thrift.ServiceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
@@ -64,12 +68,16 @@ import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.immutable.Map;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
public class KafkaAuthBinding {
private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class);
private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA;
private static final String COMPONENT_NAME = COMPONENT_TYPE;
+ private static Boolean kerberosInit;
+
private final Configuration authConf;
private final AuthorizationProvider authProvider;
private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
@@ -77,12 +85,14 @@ public class KafkaAuthBinding {
private ProviderBackend providerBackend;
private String instanceName;
private String requestorName;
+ private java.util.Map<String, ?> kafkaConfigs;
- public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf) throws Exception {
+ public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception {
this.instanceName = instanceName;
this.requestorName = requestorName;
this.authConf = authConf;
+ this.kafkaConfigs = kafkaConfigs;
this.authProvider = createAuthProvider();
}
@@ -118,6 +128,28 @@ public class KafkaAuthBinding {
+ providerBackendName);
}
+ // Initiate kerberos via UserGroupInformation if required
+ if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE))
+ && kafkaConfigs != null) {
+ String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString();
+ String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString();
+ if (keytabProp != null && principalProp != null) {
+ String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString();
+ if (actualHost != null) {
+ principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost);
+ }
+ initKerberos(keytabProp, principalProp);
+ } else {
+ LOG.debug("Could not initialize Kerberos.\n" +
+ AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" +
+ AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString());
+ }
+ } else {
+ LOG.debug("Could not initialize Kerberos as no kafka config provided. " +
+ AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() +
+ " are required configs to be able to initialize Kerberos");
+ }
+
// Instantiate the configured providerBackend
Constructor<?> providerBackendConstructor =
Class.forName(providerBackendName)
@@ -495,4 +527,36 @@ public class KafkaAuthBinding {
return principalName;
}
}
+
+ /**
+ * Initialize kerberos via UserGroupInformation. Will only attempt to login
+ * during the first request, subsequent calls will have no effect.
+ */
+ private void initKerberos(String keytabFile, String principal) {
+ if (keytabFile == null || keytabFile.length() == 0) {
+ throw new IllegalArgumentException("keytabFile required because kerberos is enabled");
+ }
+ if (principal == null || principal.length() == 0) {
+ throw new IllegalArgumentException("principal required because kerberos is enabled");
+ }
+ synchronized (KafkaAuthBinding.class) {
+ if (kerberosInit == null) {
+ kerberosInit = new Boolean(true);
+ // let's avoid modifying the supplied configuration, just to be conservative
+ final Configuration ugiConf = new Configuration();
+ ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS);
+ UserGroupInformation.setConfiguration(ugiConf);
+ LOG.info(
+ "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ",
+ keytabFile, principal);
+ try {
+ UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to login user with Principal: " + principal +
+ " and Keytab file: " + keytabFile, ioe);
+ }
+ LOG.info("Got Kerberos ticket");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
index a0007a3..6555dae 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
@@ -18,6 +18,7 @@ package org.apache.sentry.kafka.binding;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.Map;
import org.apache.sentry.kafka.conf.KafkaAuthConf;
import org.slf4j.Logger;
@@ -56,10 +57,10 @@ public class KafkaAuthBindingSingleton {
return kafkaAuthConf;
}
- public void configure(String instanceName, String requestorName, String sentry_site) {
+ public void configure(String instanceName, String requestorName, String sentry_site, Map<String, ?> kafkaConfigs) {
try {
kafkaAuthConf = loadAuthzConf(sentry_site);
- binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf);
+ binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf, kafkaConfigs);
log.info("KafkaAuthBinding created successfully");
} catch (Exception ex) {
log.error("Unable to create KafkaAuthBinding", ex);
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
index e0d767e..0a57e2e 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
@@ -30,6 +30,9 @@ public class KafkaAuthConf extends Configuration {
public static final String KAFKA_SUPER_USERS = "kafka.superusers";
public static final String KAFKA_SERVICE_INSTANCE_NAME = "sentry.kafka.service.instance";
public static final String KAFKA_SERVICE_USER_NAME = "sentry.kafka.service.user.name";
+ public static final String KAFKA_PRINCIPAL_HOSTNAME = "sentry.kafka.principal.hostname";
+ public static final String KAFKA_PRINCIPAL_NAME = "sentry.kafka.kerberos.principal";
+ public static final String KAFKA_KEYTAB_FILE_NAME = "sentry.kafka.keytab.file";
/**
* Config setting definitions
@@ -40,7 +43,10 @@ public class KafkaAuthConf extends Configuration {
AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()),
AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()),
AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"),
- AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka");
+ AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka"),
+ AUTHZ_PRINCIPAL_HOSTNAME(KAFKA_PRINCIPAL_HOSTNAME, null),
+ AUTHZ_PRINCIPAL_NAME(KAFKA_PRINCIPAL_NAME, null),
+ AUTHZ_KEYTAB_FILE_NAME(KAFKA_KEYTAB_FILE_NAME, null);
private final String varName;
private final String defaultVal;
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
index bc299b0..6803a46 100644
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
@@ -121,7 +121,7 @@ public class KafkaWildcardPrivilege implements Privilege {
if (KafkaActionConstant.actionName.equalsIgnoreCase(policyPart.getKey())) { // is action
return policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL) ||
- policyPart.equals(requestPart);
+ policyPart.getValue().equalsIgnoreCase(requestPart.getValue());
} else {
return policyPart.getValue().equals(requestPart.getValue());
}