You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2016/04/27 03:30:39 UTC

[4/6] sentry git commit: SENTRY-1188: Fixes to get kerberos auth work. (Ashish K Singh, Reviewed by: Hao Hao)

SENTRY-1188: Fixes to get kerberos auth work. (Ashish K Singh, Reviewed by: Hao Hao)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/00951b47
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/00951b47
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/00951b47

Branch: refs/heads/branch-1.7.0
Commit: 00951b4774ee34c1eaf1a88fab02a866e6c1cf19
Parents: 39d38a5
Author: hahao <ha...@cloudera.com>
Authored: Fri Apr 15 17:35:19 2016 -0700
Committer: hahao <ha...@cloudera.com>
Committed: Tue Apr 26 18:26:57 2016 -0700

----------------------------------------------------------------------
 .../kafka/authorizer/SentryKafkaAuthorizer.java |  2 +-
 .../sentry/kafka/binding/KafkaAuthBinding.java  | 66 +++++++++++++++++++-
 .../binding/KafkaAuthBindingSingleton.java      |  5 +-
 .../apache/sentry/kafka/conf/KafkaAuthConf.java |  8 ++-
 .../policy/kafka/KafkaWildcardPrivilege.java    |  2 +-
 5 files changed, 77 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/00951b47/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
index 3bce6cc..03f7b7f 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java
@@ -117,7 +117,7 @@ public class SentryKafkaAuthorizer implements Authorizer {
     }
     LOG.info("Configuring Sentry KafkaAuthorizer: " + sentry_site);
     final KafkaAuthBindingSingleton instance = KafkaAuthBindingSingleton.getInstance();
-    instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site);
+    instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site, configs);
     this.binding = instance.getAuthBinding();
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/00951b47/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
index 8f4a8c4..c6600a0 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java
@@ -16,6 +16,7 @@
  */
 package org.apache.sentry.kafka.binding;
 
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,6 +35,8 @@ import com.google.common.collect.Sets;
 import kafka.network.RequestChannel;
 import kafka.security.auth.Operation;
 import kafka.security.auth.Resource;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.sentry.SentryUserException;
@@ -55,6 +58,7 @@ import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericService
 import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
 import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
 import org.apache.sentry.provider.db.generic.service.thrift.TSentryRole;
+import org.apache.sentry.service.thrift.ServiceConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -64,12 +68,16 @@ import scala.collection.Iterator;
 import scala.collection.JavaConversions;
 import scala.collection.immutable.Map;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
 public class KafkaAuthBinding {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class);
     private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA;
     private static final String COMPONENT_NAME = COMPONENT_TYPE;
 
+    private static Boolean kerberosInit;
+
     private final Configuration authConf;
     private final AuthorizationProvider authProvider;
     private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance();
@@ -77,12 +85,14 @@ public class KafkaAuthBinding {
     private ProviderBackend providerBackend;
     private String instanceName;
     private String requestorName;
+    private java.util.Map<String, ?> kafkaConfigs;
 
 
-    public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf) throws Exception {
+    public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception {
         this.instanceName = instanceName;
         this.requestorName = requestorName;
         this.authConf = authConf;
+        this.kafkaConfigs = kafkaConfigs;
         this.authProvider = createAuthProvider();
     }
 
@@ -118,6 +128,28 @@ public class KafkaAuthBinding {
                 + providerBackendName);
         }
 
+        // Initiate kerberos via UserGroupInformation if required
+        if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE))
+                && kafkaConfigs != null) {
+            String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString();
+            String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString();
+            if (keytabProp != null && principalProp != null) {
+                String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString();
+                if (actualHost != null) {
+                    principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost);
+                }
+                initKerberos(keytabProp, principalProp);
+            } else {
+                LOG.debug("Could not initialize Kerberos.\n" +
+                        AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" +
+                        AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString());
+            }
+        } else {
+            LOG.debug("Could not initialize Kerberos as no kafka config provided. " +
+                    AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() +
+                    " are required configs to be able to initialize Kerberos");
+        }
+
         // Instantiate the configured providerBackend
         Constructor<?> providerBackendConstructor =
             Class.forName(providerBackendName)
@@ -495,4 +527,36 @@ public class KafkaAuthBinding {
             return principalName;
         }
     }
+
+    /**
+     * Initialize kerberos via UserGroupInformation.  Will only attempt to login
+     * during the first request, subsequent calls will have no effect.
+     */
+    private void initKerberos(String keytabFile, String principal) {
+        if (keytabFile == null || keytabFile.length() == 0) {
+            throw new IllegalArgumentException("keytabFile required because kerberos is enabled");
+        }
+        if (principal == null || principal.length() == 0) {
+            throw new IllegalArgumentException("principal required because kerberos is enabled");
+        }
+        synchronized (KafkaAuthBinding.class) {
+            if (kerberosInit == null) {
+                kerberosInit = new Boolean(true);
+                // let's avoid modifying the supplied configuration, just to be conservative
+                final Configuration ugiConf = new Configuration();
+                ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS);
+                UserGroupInformation.setConfiguration(ugiConf);
+                LOG.info(
+                        "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ",
+                        keytabFile, principal);
+                try {
+                    UserGroupInformation.loginUserFromKeytab(principal, keytabFile);
+                } catch (IOException ioe) {
+                    throw new RuntimeException("Failed to login user with Principal: " + principal +
+                            " and Keytab file: " + keytabFile, ioe);
+                }
+                LOG.info("Got Kerberos ticket");
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/00951b47/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
index a0007a3..6555dae 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java
@@ -18,6 +18,7 @@ package org.apache.sentry.kafka.binding;
 
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.Map;
 
 import org.apache.sentry.kafka.conf.KafkaAuthConf;
 import org.slf4j.Logger;
@@ -56,10 +57,10 @@ public class KafkaAuthBindingSingleton {
     return kafkaAuthConf;
   }
 
-  public void configure(String instanceName, String requestorName, String sentry_site) {
+  public void configure(String instanceName, String requestorName, String sentry_site, Map<String, ?> kafkaConfigs) {
     try {
       kafkaAuthConf = loadAuthzConf(sentry_site);
-      binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf);
+      binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf, kafkaConfigs);
       log.info("KafkaAuthBinding created successfully");
     } catch (Exception ex) {
       log.error("Unable to create KafkaAuthBinding", ex);

http://git-wip-us.apache.org/repos/asf/sentry/blob/00951b47/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
index e0d767e..0a57e2e 100644
--- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
+++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java
@@ -30,6 +30,9 @@ public class KafkaAuthConf extends Configuration {
   public static final String KAFKA_SUPER_USERS = "kafka.superusers";
   public static final String KAFKA_SERVICE_INSTANCE_NAME = "sentry.kafka.service.instance";
   public static final String KAFKA_SERVICE_USER_NAME = "sentry.kafka.service.user.name";
+  public static final String KAFKA_PRINCIPAL_HOSTNAME = "sentry.kafka.principal.hostname";
+  public static final String KAFKA_PRINCIPAL_NAME = "sentry.kafka.kerberos.principal";
+  public static final String KAFKA_KEYTAB_FILE_NAME = "sentry.kafka.keytab.file";
 
   /**
    * Config setting definitions
@@ -40,7 +43,10 @@ public class KafkaAuthConf extends Configuration {
     AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()),
     AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()),
     AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"),
-    AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka");
+    AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka"),
+    AUTHZ_PRINCIPAL_HOSTNAME(KAFKA_PRINCIPAL_HOSTNAME, null),
+    AUTHZ_PRINCIPAL_NAME(KAFKA_PRINCIPAL_NAME, null),
+    AUTHZ_KEYTAB_FILE_NAME(KAFKA_KEYTAB_FILE_NAME, null);
 
     private final String varName;
     private final String defaultVal;

http://git-wip-us.apache.org/repos/asf/sentry/blob/00951b47/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
index bc299b0..6803a46 100644
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
+++ b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
@@ -121,7 +121,7 @@ public class KafkaWildcardPrivilege implements Privilege {
 
     if (KafkaActionConstant.actionName.equalsIgnoreCase(policyPart.getKey())) { // is action
       return policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL) ||
-          policyPart.equals(requestPart);
+          policyPart.getValue().equalsIgnoreCase(requestPart.getValue());
     } else {
       return policyPart.getValue().equals(requestPart.getValue());
     }