You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/05/27 08:55:59 UTC

[atlas] branch branch-2.0 updated: ATLAS-3779 : Refactoring Kafka in-memory JAASConfig in Atlas.

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new b873dd9  ATLAS-3779 : Refactoring Kafka in-memory JAASConfig in Atlas.
b873dd9 is described below

commit b873dd96ed32ed4731f50ea991bb51c2b43a8a9b
Author: Jayendra Parab <ja...@gmail.com>
AuthorDate: Mon May 25 14:31:43 2020 +0530

    ATLAS-3779 : Refactoring Kafka in-memory JAASConfig in Atlas.
    
    (cherry picked from commit 61abecac22ef3e9341a07be6d5354bf246544a3b)
---
 .../org/apache/atlas/ApplicationProperties.java    |   4 -
 .../main/java/org/apache/atlas/hook/AtlasHook.java |  30 -----
 .../org/apache/atlas/kafka/KafkaNotification.java  | 126 ++++++++++++++++++++
 .../atlas/kafka/KafkaNotificationMockTest.java     | 132 +++++++++++++++++++++
 4 files changed, 258 insertions(+), 34 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
index 3ba5061..e662c8f 100644
--- a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -17,7 +17,6 @@
  */
 package org.apache.atlas;
 
-import org.apache.atlas.security.InMemoryJAASConfiguration;
 import org.apache.atlas.security.SecurityUtil;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
@@ -109,10 +108,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
     public static Configuration set(Configuration configuration) throws AtlasException {
         synchronized (ApplicationProperties.class) {
             instance = configuration;
-
-            InMemoryJAASConfiguration.init(instance);
         }
-
         return instance;
     }
 
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index cc6546b..8659126 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -93,12 +93,6 @@ public abstract class AtlasHook {
             failedMessagesLogger = null;
         }
 
-        if (!isLoginKeytabBased()) {
-            if (isLoginTicketBased()) {
-                InMemoryJAASConfiguration.setConfigSectionRedirect("KafkaClient", "ticketBased-KafkaClient");
-            }
-        }
-
         metadataNamespace         = getMetadataNamespace(atlasProperties);
         notificationMaxRetries    = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
         notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
@@ -287,30 +281,6 @@ public abstract class AtlasHook {
         }
     }
 
-    private static boolean isLoginKeytabBased() {
-        boolean ret = false;
-
-        try {
-            ret = UserGroupInformation.isLoginKeytabBased();
-        } catch (Exception excp) {
-            LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
-        }
-
-        return ret;
-    }
-
-    private static boolean isLoginTicketBased() {
-        boolean ret = false;
-
-        try {
-            ret = UserGroupInformation.isLoginTicketBased();
-        } catch (Exception excp) {
-            LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
-        }
-
-        return ret;
-    }
-
     private static String getMetadataNamespace(Configuration config) {
         return config.getString(CONF_METADATA_NAMESPACE, getClusterName(config));
     }
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 11a29b9..278b3a7 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -28,6 +28,7 @@ import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.alias.CredentialProvider;
 import org.apache.hadoop.security.alias.CredentialProviderFactory;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -64,6 +65,17 @@ public class KafkaNotification extends AbstractNotification implements Service {
     public    static final String ATLAS_ENTITIES_TOPIC       = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
     protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
 
+    static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
+    private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
+    private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
+    private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
+    private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
+    private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
+    private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
+    private static final String JAAS_PRINCIPAL_PROP = "principal";
+    private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
+    private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
+
     private   static final String[] ATLAS_HOOK_CONSUMER_TOPICS     = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
     private   static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
 
@@ -134,6 +146,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
         // if no value is specified for max.poll.records, set to 1
         properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1));
 
+        setKafkaJAASProperties(applicationProperties, properties);
+
         LOG.info("<== KafkaNotification()");
     }
 
@@ -401,4 +415,116 @@ public class KafkaNotification extends AbstractNotification implements Service {
 
         return ret;
     }
+
+    void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
+        LOG.debug("==> KafkaNotification.setKafkaJAASProperties()");
+
+        if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
+            LOG.debug("JAAS config is already set, returning");
+            return;
+        }
+
+        Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
+        // JAAS Configuration is present then update set those properties in sasl.jaas.config
+        if(jaasConfig != null && !jaasConfig.isEmpty()) {
+            String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
+
+            // Required for backward compatability for Hive CLI
+            if (!isLoginKeytabBased() && isLoginTicketBased()) {
+                LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
+                jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+            }
+            String keyPrefix = jaasClientName + ".";
+
+            String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+            String loginModuleName = jaasConfig.getProperty(keyParam);
+
+            if (loginModuleName == null) {
+                LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName);
+                return;
+            }
+
+            keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
+            String controlFlag = jaasConfig.getProperty(keyParam);
+
+            if(StringUtils.isEmpty(controlFlag)) {
+                String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
+                controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
+                LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues);
+            }
+            String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
+            String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
+            int optionPrefixLen = optionPrefix.length();
+            StringBuffer optionStringBuffer = new StringBuffer();
+            for (String key : jaasConfig.stringPropertyNames()) {
+                if (key.startsWith(optionPrefix)) {
+                    String optionVal = jaasConfig.getProperty(key);
+                    if (optionVal != null) {
+                        optionVal = optionVal.trim();
+
+                        try {
+                            if (key.equalsIgnoreCase(principalOptionKey)) {
+                                optionVal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String) null);
+                            }
+                        } catch (IOException e) {
+                            LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal);
+                        }
+
+                        optionVal = surroundWithQuotes(optionVal);
+                        optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
+                    }
+                }
+            }
+
+            String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
+            kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
+        }
+
+        LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
+    }
+
+    private static boolean isLoginKeytabBased() {
+        boolean ret = false;
+
+        try {
+            ret = UserGroupInformation.isLoginKeytabBased();
+        } catch (Exception excp) {
+            LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
+        }
+
+        return ret;
+    }
+
+    private static boolean isLoginTicketBased() {
+        boolean ret = false;
+
+        try {
+            ret = UserGroupInformation.isLoginTicketBased();
+        } catch (Exception excp) {
+            LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
+        }
+
+        return ret;
+    }
+
+    private static String surroundWithQuotes(String optionVal) {
+        if(StringUtils.isEmpty(optionVal)) {
+            return optionVal;
+        }
+        String ret = optionVal;
+
+        // For property values which have special chars like "@" or "/", we need to enclose it in
+        // double quotes, so that Kafka can parse it
+        // If the property is already enclosed in double quotes, then do nothing.
+        if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
+            // If the string as special characters like except _,-
+            final String SPECIAL_CHAR_LIST = "/!@#%^&*";
+            if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
+                ret = String.format("\"%s\"", optionVal);
+            }
+        }
+
+        return ret;
+    }
+
 }
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 9b5891f..e345c8b 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -17,14 +17,19 @@
  */
 package org.apache.atlas.kafka;
 
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.testng.annotations.Test;
+
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,6 +43,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -144,6 +150,132 @@ public class KafkaNotificationMockTest {
         }
     }
 
+    @Test
+    public void testSetKafkaJAASPropertiesForAllProperValues() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        try {
+            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+            kafkaNotification.setKafkaJAASProperties(configuration, properties);
+            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForMissingControlFlag() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        try {
+            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+            kafkaNotification.setKafkaJAASProperties(configuration, properties);
+            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForMissingLoginModuleName() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        try {
+            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+            kafkaNotification.setKafkaJAASProperties(configuration, properties);
+            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+            assertNull(newPropertyValue);
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesWithSpecialCharacters() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionKeyTabPath = "/path/to/file.keytab";
+        final String optionPrincipal = "test/_HOST@EXAMPLE.COM";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.keyTabPath", optionKeyTabPath);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.principal", optionPrincipal);
+
+        try {
+            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+            kafkaNotification.setKafkaJAASProperties(configuration, properties);
+            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+            String updatedPrincipalValue = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionPrincipal, (String) null);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("keyTabPath=\"" + optionKeyTabPath + "\""));
+            assertTrue(newPropertyValue.contains("principal=\""+ updatedPrincipalValue + "\""));
+
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+        } catch (IOException e) {
+            fail("Failed while getting updated principal value with exception : " + e.getMessage());
+        }
+
+    }
+
     class TestKafkaNotification extends KafkaNotification {
 
         private final AtlasKafkaConsumer consumer1;