You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2020/02/16 08:14:42 UTC

[atlas] branch branch-0.8 updated: ATLAS-3621: updated HiveHook to not save query-string in multiple attributes - queryText and name

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 0fc8d45  ATLAS-3621: updated HiveHook to not save query-string in multiple attributes - queryText and name
0fc8d45 is described below

commit 0fc8d45fa16933005ca152364f4b025160d5821b
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Fri Feb 14 16:21:46 2020 -0800

    ATLAS-3621: updated HiveHook to not save query-string in multiple attributes - queryText and name
---
 .../atlas/hive/hook/events/BaseHiveEvent.java      |  2 +-
 .../notification/NotificationHookConsumer.java     | 40 +++++++++++++++++++---
 .../preprocessor/HivePreprocessor.java             |  8 +++++
 .../preprocessor/PreprocessorContext.java          | 22 +++++++-----
 .../NotificationHookConsumerKafkaTest.java         | 13 ++++++-
 .../notification/NotificationHookConsumerTest.java |  3 +-
 6 files changed, 72 insertions(+), 16 deletions(-)

diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index eae88bc..0924e0c 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -551,7 +551,7 @@ public abstract class BaseHiveEvent {
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs));
         ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs));
         ret.setAttribute(ATTRIBUTE_OUTPUTS,  getObjectIds(outputs));
-        ret.setAttribute(ATTRIBUTE_NAME, queryStr);
+        ret.setAttribute(ATTRIBUTE_NAME, ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
         ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, hookContext.getOperationName());
         ret.setAttribute(ATTRIBUTE_START_TIME, hookContext.getQueryPlan().getQueryStartTime());
         ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis());
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 0a41c95..836e6f8 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -29,6 +29,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.RequestContextV1;
+import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
@@ -137,6 +138,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES             = "atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED   = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES           = "atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
+    public static final String CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME = "atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name";
 
     private final AtlasEntityStore              atlasEntityStore;
     private final ServiceState                  serviceState;
@@ -146,6 +148,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private final int                           failedMsgCacheSize;
     private final boolean                       skipHiveColumnLineageHive20633;
     private final int                           skipHiveColumnLineageHive20633InputsThreshold;
+    private final boolean                       updateHiveProcessNameWithQualifiedName;
     private final int                           largeMessageProcessingTimeThresholdMs;
     private final boolean                       consumerDisabled;
     private final List<Pattern>                 hiveTablesToIgnore = new ArrayList<>();
@@ -188,6 +191,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
         skipHiveColumnLineageHive20633                = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true);
         skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
+        updateHiveProcessNameWithQualifiedName        = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME, true);
         largeMessageProcessingTimeThresholdMs         = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000);  //  60 sec by default
         consumerDisabled 							  = applicationProperties.getBoolean(CONSUMER_DISABLED, false);
 
@@ -264,7 +268,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             hiveTablePrefixesToIgnore = Collections.emptyList();
         }
 
-        preprocessEnabled = skipHiveColumnLineageHive20633 || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
+        LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME, updateHiveProcessNameWithQualifiedName);
+
+        preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold);
@@ -528,6 +534,33 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                     return;
                 }
 
+                // covert V1 messages to V2 to enable preProcess
+                try {
+                    switch (message.getType()) {
+                        case ENTITY_CREATE: {
+                            final EntityCreateRequest      createRequest = (EntityCreateRequest) message;
+                            final AtlasEntitiesWithExtInfo entities      = instanceConverter.toAtlasEntities(createRequest.getEntities());
+                            final EntityCreateRequestV2    v2Request     = new EntityCreateRequestV2(message.getUser(), entities);
+
+                            kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getPartition());
+                            message = kafkaMsg.getMessage();
+                        }
+                        break;
+
+                        case ENTITY_FULL_UPDATE: {
+                            final EntityUpdateRequest      updateRequest = (EntityUpdateRequest) message;
+                            final AtlasEntitiesWithExtInfo entities      = instanceConverter.toAtlasEntities(updateRequest.getEntities());
+                            final EntityUpdateRequestV2    v2Request     = new EntityUpdateRequestV2(messageUser, entities);
+
+                            kafkaMsg = new AtlasKafkaMessage<>(v2Request, kafkaMsg.getOffset(), kafkaMsg.getPartition());
+                            message = kafkaMsg.getMessage();
+                        }
+                        break;
+                    }
+                } catch (AtlasBaseException excp) {
+                    LOG.error("handleMessage(): failed to convert V1 message to V2", message.getType().name());
+                }
+
                 preProcessNotificationMessage(kafkaMsg);
 
                 if (isEmptyMessage(kafkaMsg)) {
@@ -801,7 +834,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         PreprocessorContext context = null;
 
         if (preprocessEnabled) {
-            context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore);
+            context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, updateHiveProcessNameWithQualifiedName);
 
             if (context.isHivePreprocessEnabled()) {
                 preprocessHiveTypes(context);
@@ -811,10 +844,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                 skipHiveColumnLineage(context);
             }
 
-
             context.moveRegisteredReferredEntities();
 
-            if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) {
+            if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities()) && context.getEntities().size() > 1) {
                 // move hive_process and hive_column_lineage entities to end of the list
                 List<AtlasEntity> entities = context.getEntities();
                 int               count    = entities.size();
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index 1127af8..a3433cc 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -155,6 +155,14 @@ public class HivePreprocessor {
 
         @Override
         public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+            if (context.updateHiveProcessNameWithQualifiedName()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("setting {}.name={}. topic-offset={}, partition={}", entity.getTypeName(), entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), context.getKafkaMessageOffset(), context.getKafkaPartition());
+                }
+
+                entity.setAttribute(ATTRIBUTE_NAME, entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+            }
+
             if (context.isIgnoredEntity(entity.getGuid())) {
                 context.addToIgnoredEntities(entity); // so that this will be logged with typeName and qualifiedName
             } else {
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 6e42655..d06de13 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -52,19 +52,21 @@ public class PreprocessorContext {
     private final List<String>                               hiveDummyDatabasesToIgnore;
     private final List<String>                               hiveDummyTablesToIgnore;
     private final List<String>                               hiveTablePrefixesToIgnore;
+    private final boolean                                    updateHiveProcessNameWithQualifiedName;
     private final boolean                                    isHivePreProcessEnabled;
     private final Set<String>                                ignoredEntities        = new HashSet<>();
     private final Set<String>                                prunedEntities         = new HashSet<>();
     private final Set<String>                                referredEntitiesToMove = new HashSet<>();
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore) {
-        this.kafkaMessage                  = kafkaMessage;
-        this.hiveTablesToIgnore            = hiveTablesToIgnore;
-        this.hiveTablesToPrune             = hiveTablesToPrune;
-        this.hiveTablesCache               = hiveTablesCache;
-        this.hiveDummyDatabasesToIgnore    = hiveDummyDatabasesToIgnore;
-        this.hiveDummyTablesToIgnore       = hiveDummyTablesToIgnore;
-        this.hiveTablePrefixesToIgnore     = hiveTablePrefixesToIgnore;
+    public PreprocessorContext(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean updateHiveProcessNameWithQualifiedName) {
+        this.kafkaMessage                           = kafkaMessage;
+        this.hiveTablesToIgnore                     = hiveTablesToIgnore;
+        this.hiveTablesToPrune                      = hiveTablesToPrune;
+        this.hiveTablesCache                        = hiveTablesCache;
+        this.hiveDummyDatabasesToIgnore             = hiveDummyDatabasesToIgnore;
+        this.hiveDummyTablesToIgnore                = hiveDummyTablesToIgnore;
+        this.hiveTablePrefixesToIgnore              = hiveTablePrefixesToIgnore;
+        this.updateHiveProcessNameWithQualifiedName = updateHiveProcessNameWithQualifiedName;
 
         final HookNotificationMessage  message = kafkaMessage.getMessage();
 
@@ -82,7 +84,7 @@ public class PreprocessorContext {
             break;
         }
 
-        this.isHivePreProcessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
+        this.isHivePreProcessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty() || updateHiveProcessNameWithQualifiedName;
     }
 
     public AtlasKafkaMessage<HookNotificationMessage> getKafkaMessage() {
@@ -97,6 +99,8 @@ public class PreprocessorContext {
         return kafkaMessage.getPartition();
     }
 
+    public boolean updateHiveProcessNameWithQualifiedName() { return updateHiveProcessNameWithQualifiedName; }
+
     public boolean isHivePreprocessEnabled() {
         return isHivePreProcessEnabled;
     }
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 14ecc2d..22b70bb 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -25,6 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.kafka.KafkaNotification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.kafka.*;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -86,7 +87,7 @@ public class NotificationHookConsumerKafkaTest {
         MockitoAnnotations.initMocks(this);
         AtlasType mockType = mock(AtlasType.class);
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
-        AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
+        AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = new AtlasEntitiesWithExtInfo(createV2Entity());
         when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
 
         initNotificationService();
@@ -208,6 +209,16 @@ public class NotificationHookConsumerKafkaTest {
         return entity;
     }
 
+    AtlasEntity createV2Entity() {
+        final AtlasEntity entity = new AtlasEntity(AtlasClient.DATA_SET_SUPER_TYPE);
+
+        entity.setAttribute(NAME, "db" + randomString());
+        entity.setAttribute(DESCRIPTION, randomString());
+        entity.setAttribute(QUALIFIED_NAME, randomString());
+
+        return entity;
+    }
+
     protected String randomString() {
         return RandomStringUtils.randomAlphanumeric(10);
     }
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index b63bedd..a757e0d 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -78,7 +79,7 @@ public class NotificationHookConsumerTest {
         MockitoAnnotations.initMocks(this);
         AtlasType mockType = mock(AtlasType.class);
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
-        AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
+        AtlasEntitiesWithExtInfo mockEntity = new AtlasEntitiesWithExtInfo(mock(AtlasEntity.class));
         when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
         EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class);
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse);