You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2020/05/30 19:58:53 UTC

[atlas] branch branch-2.0 updated (3dc447e -> 09e382b)

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a change to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git.


    from 3dc447e  ATLAS-3803: Update tag propagation to NONE for relationshipDefs in aws_s3_v2 and azure_adls_gen2 models
     new ac3fd28  ATLAS-3806: Classifications information missing in notification events during entity update/delete
     new 09e382b  ATLAS-3779: fallback to KafkaClient jaas configiration when ticket-basedKafkaClient is not specified

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../model/instance/EntityMutationResponse.java     |  4 +-
 .../org/apache/atlas/kafka/KafkaNotification.java  | 26 +++++---
 .../atlas/kafka/KafkaNotificationMockTest.java     | 71 ++++++++++++++++++++++
 .../store/graph/v2/EntityGraphMapper.java          | 17 ++----
 4 files changed, 97 insertions(+), 21 deletions(-)


[atlas] 02/02: ATLAS-3779: fallback to KafkaClient jaas configiration when ticket-basedKafkaClient is not specified

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 09e382b9a0a5f48e3551db69996ad0b1e04cf79e
Author: Jayendra Parab <ja...@gmail.com>
AuthorDate: Sat May 30 16:20:59 2020 +0530

    ATLAS-3779: fallback to KafkaClient jaas configiration when ticket-basedKafkaClient is not specified
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
    (cherry picked from commit 7a8ca51f83bdd15bf5290f718bd6a79e16eca61b)
---
 .../org/apache/atlas/kafka/KafkaNotification.java  | 26 +++++---
 .../atlas/kafka/KafkaNotificationMockTest.java     | 71 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 8 deletions(-)

diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 278b3a7..05fd977 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -29,8 +29,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.alias.CredentialProvider;
-import org.apache.hadoop.security.alias.CredentialProviderFactory;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -431,12 +429,22 @@ public class KafkaNotification extends AbstractNotification implements Service {
 
             // Required for backward compatability for Hive CLI
             if (!isLoginKeytabBased() && isLoginTicketBased()) {
-                LOG.debug("Using ticketBased-KafkaClient JAAS configuration");
-                jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+                LOG.debug("Checking if ticketBased-KafkaClient is set");
+                // if ticketBased-KafkaClient property is not specified then use the default client name
+                String        ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
+                Configuration ticketBasedConfig       = configuration.subset(ticketBasedConfigPrefix);
+
+                if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
+                    LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
+
+                    jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+                } else {
+                    LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
+                }
             }
-            String keyPrefix = jaasClientName + ".";
 
-            String keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+            String keyPrefix       = jaasClientName + ".";
+            String keyParam        = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
             String loginModuleName = jaasConfig.getProperty(keyParam);
 
             if (loginModuleName == null) {
@@ -483,7 +491,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
         LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
     }
 
-    private static boolean isLoginKeytabBased() {
+    @VisibleForTesting
+    boolean isLoginKeytabBased() {
         boolean ret = false;
 
         try {
@@ -495,7 +504,8 @@ public class KafkaNotification extends AbstractNotification implements Service {
         return ret;
     }
 
-    private static boolean isLoginTicketBased() {
+    @VisibleForTesting
+    boolean isLoginTicketBased() {
         boolean ret = false;
 
         try {
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index e345c8b..51c5a0d 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -276,6 +277,76 @@ public class KafkaNotificationMockTest {
 
     }
 
+    @Test
+    public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
+
+        try {
+            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+            KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
+            when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+            when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+            spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
+            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        try {
+            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
+            KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
+            when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
+            when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
+            spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
+            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+        } catch (AtlasException e) {
+            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
+        }
+    }
+
     class TestKafkaNotification extends KafkaNotification {
 
         private final AtlasKafkaConsumer consumer1;


[atlas] 01/02: ATLAS-3806: Classifications information missing in notification events during entity update/delete

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit ac3fd280914d266e1180b1c2413a9874eec9a06a
Author: sidmishra <si...@cloudera.com>
AuthorDate: Fri May 22 15:49:28 2020 -0700

    ATLAS-3806: Classifications information missing in notification events during entity update/delete
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
    (cherry picked from commit ac0cd87ad4818020d012ae5265759891c3860a9a)
---
 .../atlas/model/instance/EntityMutationResponse.java    |  4 ++--
 .../repository/store/graph/v2/EntityGraphMapper.java    | 17 ++++++-----------
 2 files changed, 8 insertions(+), 13 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
index d83f0e0..aa5e8a3 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
@@ -215,10 +215,10 @@ public class EntityMutationResponse {
 
     @JsonIgnore
     public void addEntity(EntityOperation op, AtlasEntityHeader header) {
-        // if an entity is already included in CREATE, ignore subsequent UPDATE, PARTIAL_UPDATE
+        // if an entity is already included in CREATE, update the header, to capture propagated classifications
         if (op == EntityOperation.UPDATE || op == EntityOperation.PARTIAL_UPDATE) {
             if (entityHeaderExists(getCreatedEntities(), header.getGuid())) {
-                return;
+                op = EntityOperation.CREATE;
             }
         }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index ec23db4..779de2a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -275,7 +275,7 @@ public class EntityGraphMapper {
                 mapAttributes(createdEntity, entityType, vertex, CREATE, context);
                 setCustomAttributes(vertex,createdEntity);
 
-                resp.addEntity(CREATE, constructHeader(createdEntity, entityType, vertex));
+                resp.addEntity(CREATE, constructHeader(createdEntity, vertex));
                 addClassifications(context, guid, createdEntity.getClassifications());
 
                 addOrUpdateBusinessAttributes(vertex, entityType, createdEntity.getBusinessAttributes());
@@ -297,7 +297,7 @@ public class EntityGraphMapper {
                 mapAttributes(updatedEntity, entityType, vertex, updateType, context);
                 setCustomAttributes(vertex,updatedEntity);
 
-                resp.addEntity(updateType, constructHeader(updatedEntity, entityType, vertex));
+                resp.addEntity(updateType, constructHeader(updatedEntity, vertex));
 
                 if (replaceClassifications) {
                     deleteClassifications(guid);
@@ -1843,15 +1843,10 @@ public class EntityGraphMapper {
         }
     }
 
-    private AtlasEntityHeader constructHeader(AtlasEntity entity, final AtlasEntityType type, AtlasVertex vertex) {
-        AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName());
-
-        header.setGuid(getIdFromVertex(vertex));
-        header.setStatus(entity.getStatus());
-        header.setIsIncomplete(entity.getIsIncomplete());
-
-        for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
-            header.setAttribute(attribute.getName(), entity.getAttribute(attribute.getName()));
+    private AtlasEntityHeader constructHeader(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
+        AtlasEntityHeader header = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
+        if (entity.getClassifications() == null) {
+            entity.setClassifications(header.getClassifications());
         }
 
         return header;