You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2020/05/30 19:58:55 UTC
[atlas] 02/02: ATLAS-3779: fallback to KafkaClient jaas
configiration when ticket-basedKafkaClient is not specified
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 09e382b9a0a5f48e3551db69996ad0b1e04cf79e
Author: Jayendra Parab <ja...@gmail.com>
AuthorDate: Sat May 30 16:20:59 2020 +0530
ATLAS-3779: fallback to KafkaClient jaas configiration when ticket-basedKafkaClient is not specified
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
(cherry picked from commit 7a8ca51f83bdd15bf5290f718bd6a79e16eca61b)
---
.../org/apache/atlas/kafka/KafkaNotification.java | 26 +++++---
.../atlas/kafka/KafkaNotificationMockTest.java | 71 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 8 deletions(-)
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 278b3a7..05fd977 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -29,8 +29,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.alias.CredentialProvider;
-import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -431,12 +429,22 @@ public class KafkaNotification extends AbstractNotification implements Service {
// Required for backward compatability for Hive CLI
if (!isLoginKeytabBased() && isLoginTicketBased()) {
- LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
- jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+ LOG.debug("Checking if ticketBased-KafkaClient is set");
+ // if ticketBased-KafkaClient property is not specified then use the default client name
+ String ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
+ Configuration ticketBasedConfig = configuration.subset(ticketBasedConfigPrefix);
+
+ if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
+ LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
+
+ jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+ } else {
+ LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
+ }
}
- String keyPrefix = jaasClientName + ".";
- String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+ String keyPrefix = jaasClientName + ".";
+ String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
String loginModuleName = jaasConfig.getProperty(keyParam);
if (loginModuleName == null) {
@@ -483,7 +491,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
}
- private static boolean isLoginKeytabBased() {
+ @VisibleForTesting
+ boolean isLoginKeytabBased() {
boolean ret = false;
try {
@@ -495,7 +504,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
return ret;
}
- private static boolean isLoginTicketBased() {
+ @VisibleForTesting
+ boolean isLoginTicketBased() {
boolean ret = false;
try {
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index e345c8b..51c5a0d 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.io.IOException;
@@ -276,6 +277,76 @@ public class KafkaNotificationMockTest {
}
+ @Test
+ public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+ configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
+ configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab", optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey", optionStoreKey);
+ configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+ KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
+ when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+ when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+ spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+ assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
+ Properties properties = new Properties();
+ Configuration configuration = new PropertiesConfiguration();
+
+ final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+ final String loginModuleControlFlag = "required";
+ final String optionUseKeyTab = "false";
+ final String optionStoreKey = "true";
+ final String optionServiceName = "kafka";
+
+ configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+ configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+ configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+ try {
+ KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+ KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
+ when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+ when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+ spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
+ String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+ assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+ assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+ assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+ assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+ } catch (AtlasException e) {
+ fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+ }
+ }
+
class TestKafkaNotification extends KafkaNotification {
private final AtlasKafkaConsumer consumer1;