You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/05/27 08:55:59 UTC
[atlas] branch branch-2.0 updated: ATLAS-3779 : Refactoring Kafka
in-memory JAASConfig in Atlas.
This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new b873dd9 ATLAS-3779 : Refactoring Kafka in-memory JAASConfig in Atlas.
b873dd9 is described below
commit b873dd96ed32ed4731f50ea991bb51c2b43a8a9b
Author: Jayendra Parab <ja...@gmail.com>
AuthorDate: Mon May 25 14:31:43 2020 +0530
ATLAS-3779 : Refactoring Kafka in-memory JAASConfig in Atlas.
(cherry picked from commit 61abecac22ef3e9341a07be6d5354bf246544a3b)
---
.../org/apache/atlas/ApplicationProperties.java | 4 -
.../main/java/org/apache/atlas/hook/AtlasHook.java | 30 -----
.../org/apache/atlas/kafka/KafkaNotification.java | 126 ++++++++++++++++++++
.../atlas/kafka/KafkaNotificationMockTest.java | 132 +++++++++++++++++++++
4 files changed, 258 insertions(+), 34 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
index 3ba5061..e662c8f 100644
--- a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -17,7 +17,6 @@
*/
package org.apache.atlas;
-import org.apache.atlas.security.InMemoryJAASConfiguration;
import org.apache.atlas.security.SecurityUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
@@ -109,10 +108,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static Configuration set(Configuration configuration) throws AtlasException {
synchronized (ApplicationProperties.class) {
instance = configuration;
-
- InMemoryJAASConfiguration.init(instance);
}
-
return instance;
}
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index cc6546b..8659126 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -93,12 +93,6 @@ public abstract class AtlasHook {
failedMessagesLogger = null;
}
- if (!isLoginKeytabBased()) {
- if (isLoginTicketBased()) {
- InMemoryJAASConfiguration.setConfigSectionRedirect("KafkaClient", "ticketBased-KafkaClient");
- }
- }
-
metadataNamespace = getMetadataNamespace(atlasProperties);
notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
@@ -287,30 +281,6 @@ public abstract class AtlasHook {
}
}
- private static boolean isLoginKeytabBased() {
- boolean ret = false;
-
- try {
- ret = UserGroupInformation.isLoginKeytabBased();
- } catch (Exception excp) {
- LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
- }
-
- return ret;
- }
-
- private static boolean isLoginTicketBased() {
- boolean ret = false;
-
- try {
- ret = UserGroupInformation.isLoginTicketBased();
- } catch (Exception excp) {
- LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
- }
-
- return ret;
- }
-
private static String getMetadataNamespace(Configuration config) {
return config.getString(CONF_METADATA_NAMESPACE, getClusterName(config));
}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 11a29b9..278b3a7 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -28,6 +28,7 @@ import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -64,6 +65,17 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
+ static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
+ private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
+ private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
+ private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
+ private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
+ private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
+ private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
+ private static final String JAAS_PRINCIPAL_PROP = "principal";
+ private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
+ private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
+
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
@@ -134,6 +146,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
// if no value is specified for max.poll.records, set to 1
properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1));
+ setKafkaJAASProperties(applicationProperties, properties);
+
LOG.info("<== KafkaNotification()");
}
@@ -401,4 +415,116 @@ public class KafkaNotification extends AbstractNotification implements Service {
return ret;
}
+
+ void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
+ LOG.debug("==> KafkaNotification.setKafkaJAASProperties()");
+
+ if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
+ LOG.debug("JAAS config is already set, returning");
+ return;
+ }
+
+ Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
+ // JAAS Configuration is present then update set those properties in sasl.jaas.config
+ if(jaasConfig != null && !jaasConfig.isEmpty()) {
+ String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
+
+ // Required for backward compatability for Hive CLI
+ if (!isLoginKeytabBased() && isLoginTicketBased()) {
+ LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
+ jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+ }
+ String keyPrefix = jaasClientName + ".";
+
+ String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+ String loginModuleName = jaasConfig.getProperty(keyParam);
+
+ if (loginModuleName == null) {
+ LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName);
+ return;
+ }
+
+ keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
+ String controlFlag = jaasConfig.getProperty(keyParam);
+
+ if(StringUtils.isEmpty(controlFlag)) {
+ String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
+ controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
+ LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues);
+ }
+ String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
+ String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
+ int optionPrefixLen = optionPrefix.length();
+ StringBuffer optionStringBuffer = new StringBuffer();
+ for (String key : jaasConfig.stringPropertyNames()) {
+ if (key.startsWith(optionPrefix)) {
+ String optionVal = jaasConfig.getProperty(key);
+ if (optionVal != null) {
+ optionVal = optionVal.trim();
+
+ try {
+ if (key.equalsIgnoreCase(principalOptionKey)) {
+ optionVal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String) null);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal);
+ }
+
+ optionVal = surroundWithQuotes(optionVal);
+ optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
+ }
+ }
+ }
+
+ String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
+ kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
+ }
+
+ LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
+ }
+
+ private static boolean isLoginKeytabBased() {
+ boolean ret = false;
+
+ try {
+ ret = UserGroupInformation.isLoginKeytabBased();
+ } catch (Exception excp) {
+ LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
+ }
+
+ return ret;
+ }
+
+ private static boolean isLoginTicketBased() {
+ boolean ret = false;
+
+ try {
+ ret = UserGroupInformation.isLoginTicketBased();
+ } catch (Exception excp) {
+ LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
+ }
+
+ return ret;
+ }
+
+ private static String surroundWithQuotes(String optionVal) {
+ if(StringUtils.isEmpty(optionVal)) {
+ return optionVal;
+ }
+ String ret = optionVal;
+
+ // For property values which have special chars like "@" or "/", we need to enclose it in
+ // double quotes, so that Kafka can parse it
+ // If the property is already enclosed in double quotes, then do nothing.
+ if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
+ // If the string as special characters like except _,-
+ final String SPECIAL_CHAR_LIST = "/!@#%^&*";
+ if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
+ ret = String.format("\"%s\"", optionVal);
+ }
+ }
+
+ return ret;
+ }
+
}
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 9b5891f..e345c8b 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -17,14 +17,19 @@
*/
package org.apache.atlas.kafka;
+import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
+
+import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,6 +43,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -144,6 +150,132 @@ public class KafkaNotificationMockTest {
}
}
+ @Test
+ public void testSetKafkaJAASPropertiesForAllProperValues() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+ configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+ configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+ kafkaNotification.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+ assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForMissingControlFlag() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+ configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+ kafkaNotification.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+ assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForMissingLoginModuleName() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+ configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+ kafkaNotification.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertNull(newPropertyValue);
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesWithSpecialCharacters() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionKeyTabPath = "/path/to/file.keytab";
+ final String optionPrincipal = "test/_HOST@EXAMPLE.COM";
+
+ configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+ configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.keyTabPath", optionKeyTabPath);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.principal", optionPrincipal);
+
+ try {
+ KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+ kafkaNotification.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+ String updatedPrincipalValue = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionPrincipal, (String) null);
+
+ assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+ assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+ assertTrue(newPropertyValue.contains("keyTabPath=\"" + optionKeyTabPath + "\""));
+ assertTrue(newPropertyValue.contains("principal=\""+ updatedPrincipalValue + "\""));
+
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+ } catch (IOException e) {
+ fail("Failed while getting updated principal value with exception : " + e.getMessage());
+ }
+
+ }
+
class TestKafkaNotification extends KafkaNotification {
private final AtlasKafkaConsumer consumer1;