You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2016/04/16 05:21:03 UTC
[2/2] sentry git commit: SENTRY-1188: Fixes to get kerberos auth
work. (Ashish K Singh, Reviewed by: Hao Hao)
SENTRY-1188: Fixes to get kerberos auth work. (Ashish K Singh, Reviewed by: Hao Hao)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/6d79016a
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/6d79016a
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/6d79016a
Branch: refs/heads/master
Commit: 6d79016aaf2c9179bea9171c393990a466248753
Parents: 70d0ecc
Author: hahao <ha...@cloudera.com>
Authored: Fri Apr 15 17:35:19 2016 -0700
Committer: hahao <ha...@cloudera.com>
Committed: Fri Apr 15 20:20:39 2016 -0700
----------------------------------------------------------------------
.../kafka/authorizer/SentryKafkaAuthorizer.java | 2 +-
.../sentry/kafka/binding/KafkaAuthBinding.java | 66 +++++++++++++++++++-
.../binding/KafkaAuthBindingSingleton.java | 5 +-
.../apache/sentry/kafka/conf/KafkaAuthConf.java | 8 ++-
.../policy/kafka/KafkaWildcardPrivilege.java | 2 +-
5 files changed, 77 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
index 3bce6cc..03f7b7f 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
@@ -117,7 +117,7 @@ public class SentryKafkaAuthorizer implements Authorizer {
}
LOG.info("Configuring Sentry KafkaAuthorizer: " + sentry_site);
final KafkaAuthBindingSingleton instance = KafkaAuthBindingSingleton.getInstance();
- instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site);
+ instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site, configs);
this.binding = instance.getAuthBinding();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index 8f4a8c4..c6600a0 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -16,6 +16,7 @@
*/
package org.apache.sentry.kafka.binding;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,6 +35,8 @@ import com.google.common.collect.Sets;
import kafka.network.RequestChannel;
import kafka.security.auth.Operation;
import kafka.security.auth.Resource;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.sentry.SentryUserException;
@@ -55,6 +58,7 @@ import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericService
import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
import org.apache.sentry.provider.db.generic.service.thrift.TSentryRole;
+import org.apache.sentry.service.thrift.ServiceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
@@ -64,12 +68,16 @@ import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.immutable.Map;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
public class KafkaAuthBinding {
private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class);
private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA;
private static final String COMPONENT_NAME = COMPONENT_TYPE;
+ private static Boolean kerberosInit;
+
private final Configuration authConf;
private final AuthorizationProvider authProvider;
private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
@@ -77,12 +85,14 @@ public class KafkaAuthBinding {
private ProviderBackend providerBackend;
private String instanceName;
private String requestorName;
+ private java.util.Map<String, ?> kafkaConfigs;
- public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf) throws Exception {
+ public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception {
this.instanceName = instanceName;
this.requestorName = requestorName;
this.authConf = authConf;
+ this.kafkaConfigs = kafkaConfigs;
this.authProvider = createAuthProvider();
}
@@ -118,6 +128,28 @@ public class KafkaAuthBinding {
+ providerBackendName);
}
+ // Initiate kerberos via UserGroupInformation if required
+ if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE))
+ && kafkaConfigs != null) {
+ String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString();
+ String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString();
+ if (keytabProp != null && principalProp != null) {
+ String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString();
+ if (actualHost != null) {
+ principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost);
+ }
+ initKerberos(keytabProp, principalProp);
+ } else {
+ LOG.debug("Could not initialize Kerberos.\n" +
+ AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" +
+ AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString());
+ }
+ } else {
+ LOG.debug("Could not initialize Kerberos as no kafka config provided. " +
+ AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() +
+ " are required configs to be able to initialize Kerberos");
+ }
+
// Instantiate the configured providerBackend
Constructor<?> providerBackendConstructor =
Class.forName(providerBackendName)
@@ -495,4 +527,36 @@ public class KafkaAuthBinding {
return principalName;
}
}
+
+ /**
+ * Initialize kerberos via UserGroupInformation. Will only attempt to login
+ * during the first request, subsequent calls will have no effect.
+ */
+ private void initKerberos(String keytabFile, String principal) {
+ if (keytabFile == null || keytabFile.length() == 0) {
+ throw new IllegalArgumentException("keytabFile required because kerberos is enabled");
+ }
+ if (principal == null || principal.length() == 0) {
+ throw new IllegalArgumentException("principal required because kerberos is enabled");
+ }
+ synchronized (KafkaAuthBinding.class) {
+ if (kerberosInit == null) {
+ kerberosInit = new Boolean(true);
+ // let's avoid modifying the supplied configuration, just to be conservative
+ final Configuration ugiConf = new Configuration();
+ ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS);
+ UserGroupInformation.setConfiguration(ugiConf);
+ LOG.info(
+ "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ",
+ keytabFile, principal);
+ try {
+ UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to login user with Principal: " + principal +
+ " and Keytab file: " + keytabFile, ioe);
+ }
+ LOG.info("Got Kerberos ticket");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
index a0007a3..6555dae 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
@@ -18,6 +18,7 @@ package org.apache.sentry.kafka.binding;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.Map;
import org.apache.sentry.kafka.conf.KafkaAuthConf;
import org.slf4j.Logger;
@@ -56,10 +57,10 @@ public class KafkaAuthBindingSingleton {
return kafkaAuthConf;
}
- public void configure(String instanceName, String requestorName, String sentry_site) {
+ public void configure(String instanceName, String requestorName, String sentry_site, Map<String, ?> kafkaConfigs) {
try {
kafkaAuthConf = loadAuthzConf(sentry_site);
- binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf);
+ binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf, kafkaConfigs);
log.info("KafkaAuthBinding created successfully");
} catch (Exception ex) {
log.error("Unable to create KafkaAuthBinding", ex);
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
index e0d767e..0a57e2e 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
@@ -30,6 +30,9 @@ public class KafkaAuthConf extends Configuration {
public static final String KAFKA_SUPER_USERS = "kafka.superusers";
public static final String KAFKA_SERVICE_INSTANCE_NAME = "sentry.kafka.service.instance";
public static final String KAFKA_SERVICE_USER_NAME = "sentry.kafka.service.user.name";
+ public static final String KAFKA_PRINCIPAL_HOSTNAME = "sentry.kafka.principal.hostname";
+ public static final String KAFKA_PRINCIPAL_NAME = "sentry.kafka.kerberos.principal";
+ public static final String KAFKA_KEYTAB_FILE_NAME = "sentry.kafka.keytab.file";
/**
* Config setting definitions
@@ -40,7 +43,10 @@ public class KafkaAuthConf extends Configuration {
AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()),
AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()),
AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"),
- AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka");
+ AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka"),
+ AUTHZ_PRINCIPAL_HOSTNAME(KAFKA_PRINCIPAL_HOSTNAME, null),
+ AUTHZ_PRINCIPAL_NAME(KAFKA_PRINCIPAL_NAME, null),
+ AUTHZ_KEYTAB_FILE_NAME(KAFKA_KEYTAB_FILE_NAME, null);
private final String varName;
private final String defaultVal;
http://git-wip-us.apache.org/repos/asf/sentry/blob/6d79016a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
index bc299b0..6803a46 100644
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
@@ -121,7 +121,7 @@ public class KafkaWildcardPrivilege implements Privilege {
if (KafkaActionConstant.actionName.equalsIgnoreCase(policyPart.getKey())) { // is action
return policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL) ||
- policyPart.equals(requestPart);
+ policyPart.getValue().equalsIgnoreCase(requestPart.getValue());
} else {
return policyPart.getValue().equals(requestPart.getValue());
}