You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2020/05/30 19:58:55 UTC

[atlas] 02/02: ATLAS-3779: fallback to KafkaClient jaas configiration when ticket-basedKafkaClient is not specified

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 09e382b9a0a5f48e3551db69996ad0b1e04cf79e
Author: Jayendra Parab <ja...@gmail.com>
AuthorDate: Sat May 30 16:20:59 2020 +0530

    ATLAS-3779: fallback to KafkaClient jaas configiration when ticket-basedKafkaClient is not specified
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
    (cherry picked from commit 7a8ca51f83bdd15bf5290f718bd6a79e16eca61b)
---
 .../org/apache/atlas/kafka/KafkaNotification.java  | 26 +++++---
 .../atlas/kafka/KafkaNotificationMockTest.java     | 71 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 8 deletions(-)

diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 278b3a7..05fd977 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -29,8 +29,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.alias.CredentialProvider;
-import org.apache.hadoop.security.alias.CredentialProviderFactory;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -431,12 +429,22 @@ public class KafkaNotification extends AbstractNotification implements Service {
 
             // Required for backward compatability for Hive CLI
             if (!isLoginKeytabBased() && isLoginTicketBased()) {
-                LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
-                jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+                LOG.debug("Checking if ticketBased-KafkaClient is set");
+                // if ticketBased-KafkaClient property is not specified then use the default client name
+                String        ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
+                Configuration ticketBasedConfig       = configuration.subset(ticketBasedConfigPrefix);
+
+                if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
+                    LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
+
+                    jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+                } else {
+                    LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
+                }
             }
-            String keyPrefix = jaasClientName + ".";
 
-            String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+            String keyPrefix       = jaasClientName + ".";
+            String keyParam        = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
             String loginModuleName = jaasConfig.getProperty(keyParam);
 
             if (loginModuleName == null) {
@@ -483,7 +491,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
         LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
     }
 
-    private static boolean isLoginKeytabBased() {
+    @VisibleForTesting
+    boolean isLoginKeytabBased() {
         boolean ret = false;
 
         try {
@@ -495,7 +504,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
         return ret;
     }
 
-    private static boolean isLoginTicketBased() {
+    @VisibleForTesting
+    boolean isLoginTicketBased() {
         boolean ret = false;
 
         try {
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index e345c8b..51c5a0d 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -276,6 +277,76 @@ public class KafkaNotificationMockTest {
 
     }
 
+    @Test
+    public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
+
+        try {
+            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+            KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
+            when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+            when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+            spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
+            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        try {
+            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+            KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
+            when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+            when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+            spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
+            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+        }
+    }
+
     class TestKafkaNotification extends KafkaNotification {
 
         private final AtlasKafkaConsumer consumer1;