You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/05/20 21:52:04 UTC

[atlas] branch master updated: ATLAS-4285: Multiple propagations with intersecting lineage.

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new aba97b3  ATLAS-4285: Multiple propagations with intersecting lineage.
aba97b3 is described below

commit aba97b35393f1732eb30858e69fd5f489634afdc
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu May 13 17:28:26 2021 -0700

    ATLAS-4285: Multiple propagations with intersecting lineage.
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
 .../store/graph/v2/EntityGraphMapper.java          | 79 +++-------------------
 .../tasks/ClassificationPropagateTaskFactory.java  |  5 --
 .../v2/tasks/ClassificationPropagationTasks.java   | 15 ----
 .../apache/atlas/tasks/TaskFactoryRegistry.java    | 23 ++++++-
 .../org/apache/atlas/tasks/TaskManagement.java     | 11 +++
 .../ClassificationPropagationWithTasksTest.java    |  6 --
 6 files changed, 43 insertions(+), 96 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 7984a34..5baff33 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -2041,10 +2041,7 @@ public class EntityGraphMapper {
                 return null;
             }
 
-            GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
-
             AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
-
             if (entityVertex == null) {
                 LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
 
@@ -2052,7 +2049,6 @@ public class EntityGraphMapper {
             }
 
             AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
-
             if (classificationVertex == null) {
                 LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
 
@@ -2060,13 +2056,15 @@ public class EntityGraphMapper {
             }
 
             List<AtlasVertex> impactedVertices = entityRetriever.getIncludedImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId);
-
             if (CollectionUtils.isEmpty(impactedVertices)) {
                 LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId);
 
                 return null;
             }
 
+            List<String> impactedVerticesGuidsToLock = impactedVertices.stream().map(x -> GraphHelper.getGuid(x)).collect(Collectors.toList());
+            GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedVerticesGuidsToLock);
+
             AtlasClassification classification       = entityRetriever.toAtlasClassification(classificationVertex);
             List<AtlasVertex>   entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, impactedVertices);
 
@@ -2453,63 +2451,6 @@ public class EntityGraphMapper {
         AtlasPerfTracer.log(perf);
     }
 
-    @GraphTransaction
-    public List<String> updateClassificationsPropagation(String entityGuid, String classificationVertexId, String relationshipGuid) throws AtlasBaseException {
-        try {
-            if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
-                LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);
-                return null;
-            }
-
-            AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
-
-            if (entityVertex == null) {
-                LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
-                return null;
-            }
-
-            AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
-
-            if (classificationVertex == null) {
-                LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
-                return null;
-            }
-
-            List<AtlasVertex> entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertex.getIdForDisplay());
-
-            if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
-                LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no impacted vertices found!", entityGuid, classificationVertexId);
-                return null;
-            }
-
-            List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo);
-
-            if (CollectionUtils.isEmpty(entitiesPropagatedTo)) {
-                LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no propagations added!", entityGuid, classificationVertexId);
-                return null;
-            }
-
-            AtlasClassification updatedClassification = entityRetriever.toAtlasClassification(classificationVertex);
-            List<String>        ret                   = new ArrayList<>();
-
-            for (AtlasVertex vertex : entitiesToPropagateTo) {
-                AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
-
-                ret.add(entity.getGuid());
-
-                if (isActive(entity)) {
-                    vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
-
-                    entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(updatedClassification));
-                }
-            }
-
-            return ret;
-        } catch (Exception ex) {
-            throw new AtlasBaseException(ex);
-        }
-    }
-
     private AtlasEdge mapClassification(EntityOperation operation,  final EntityMutationContext context, AtlasClassification classification,
                                         AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
                                         throws AtlasBaseException {
@@ -2566,11 +2507,8 @@ public class EntityGraphMapper {
                 return null;
             }
 
-            GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
-
             AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
             AtlasClassification classification     = entityRetriever.toAtlasClassification(classificationVertex);
-
             if (classificationVertex == null) {
                 LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex not found", classificationVertexId);
 
@@ -2578,14 +2516,14 @@ public class EntityGraphMapper {
             }
 
             List<AtlasVertex> entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
-
             deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true);
-
             if (CollectionUtils.isEmpty(entityVertices)) {
-
                 return null;
             }
 
+            List<String> impactedGuids = entityVertices.stream().map(x -> GraphHelper.getGuid(x)).collect(Collectors.toList());
+            GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids);
+
             List<AtlasEntity>   propagatedEntities = updateClassificationText(classification, entityVertices);
 
             entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification));
@@ -2787,8 +2725,11 @@ public class EntityGraphMapper {
                 AtlasEntity entity = instanceConverter.getAndCacheEntity(graphHelper.getGuid(vertex), ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
 
                 if (isActive(entity)) {
-                    vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+                    String classificationTextForEntity = fullTextMapperV2.getClassificationTextForEntity(entity);
+                    vertex.setProperty(CLASSIFICATION_TEXT_KEY, classificationTextForEntity);
                     propagatedEntities.add(entity);
+
+                    LOG.info("updateClassificationText: {}: {}", classification.getTypeName(), classificationTextForEntity);
                 }
             }
         }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
index 8a81dc9..b3320c6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
@@ -37,13 +37,11 @@ public class ClassificationPropagateTaskFactory implements TaskFactory {
     private static final Logger LOG = LoggerFactory.getLogger(ClassificationPropagateTaskFactory.class);
 
     public static final String CLASSIFICATION_PROPAGATION_ADD                 = "CLASSIFICATION_PROPAGATION_ADD";
-    public static final String CLASSIFICATION_PROPAGATION_UPDATE              = "CLASSIFICATION_PROPAGATION_UPDATE";
     public static final String CLASSIFICATION_PROPAGATION_DELETE              = "CLASSIFICATION_PROPAGATION_DELETE";
     public static final String CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE = "CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE";
 
     private static final List<String> supportedTypes = new ArrayList<String>() {{
         add(CLASSIFICATION_PROPAGATION_ADD);
-        add(CLASSIFICATION_PROPAGATION_UPDATE);
         add(CLASSIFICATION_PROPAGATION_DELETE);
         add(CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE);
     }};
@@ -69,9 +67,6 @@ public class ClassificationPropagateTaskFactory implements TaskFactory {
             case CLASSIFICATION_PROPAGATION_ADD:
                 return new ClassificationPropagationTasks.Add(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
 
-            case CLASSIFICATION_PROPAGATION_UPDATE:
-                return new ClassificationPropagationTasks.Update(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
-
             case CLASSIFICATION_PROPAGATION_DELETE:
                 return new ClassificationPropagationTasks.Delete(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
index f86cbc7..aec5b0c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
@@ -45,21 +45,6 @@ public class ClassificationPropagationTasks {
         }
     }
 
-    public static class Update extends ClassificationTask {
-        public Update(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
-            super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
-        }
-
-        @Override
-        protected void run(Map<String, Object> parameters) throws AtlasBaseException {
-            String entityGuid             = (String) parameters.get(PARAM_ENTITY_GUID);
-            String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
-            String relationshipGuid       = (String) parameters.get(PARAM_RELATIONSHIP_GUID);
-
-            entityGraphMapper.updateClassificationsPropagation(entityGuid, classificationVertexId, relationshipGuid);
-        }
-    }
-
     public static class Delete extends ClassificationTask {
         public Delete(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
             super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
index 38f2cc9..4c93f0e 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
@@ -17,10 +17,12 @@
  */
 package org.apache.atlas.tasks;
 
+import org.apache.atlas.AtlasException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import java.util.Set;
 
@@ -28,12 +30,31 @@ import java.util.Set;
 public class TaskFactoryRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(TaskFactoryRegistry.class);
 
+    private final TaskManagement taskManagement;
+
     @Inject
     public TaskFactoryRegistry(TaskManagement taskManagement, Set<TaskFactory> factories) {
+        this.taskManagement = taskManagement;
         for (TaskFactory factory : factories) {
             taskManagement.addFactory(factory);
         }
 
         LOG.info("TaskFactoryRegistry: TaskManagement updated with factories: {}", factories.size());
     }
-}
\ No newline at end of file
+
+    @PostConstruct
+    public void startTaskManagement() throws AtlasException {
+        try {
+            if (!taskManagement.hasStarted()) {
+                LOG.info("TaskFactoryRegistry: TaskManagement start skipped! Someone else will start it.");
+                return;
+            }
+
+            LOG.info("TaskFactoryRegistry: Starting TaskManagement...");
+            taskManagement.start();
+        } catch (AtlasException e) {
+            LOG.error("Error starting TaskManagement!", e);
+            throw e;
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
index 2756504..9a519ba 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
@@ -49,6 +49,7 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
     private final TaskRegistry              registry;
     private final Statistics                statistics;
     private final Map<String, TaskFactory>  taskTypeFactoryMap;
+    private       boolean                   hasStarted;
 
     @Inject
     public TaskManagement(Configuration configuration, TaskRegistry taskRegistry) {
@@ -75,6 +76,12 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
         } else {
             LOG.info("TaskManagement.start(): deferring until instance activation");
         }
+
+        this.hasStarted = true;
+    }
+
+    public boolean hasStarted() {
+        return this.hasStarted;
     }
 
     @Override
@@ -183,6 +190,10 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
         }
 
         LOG.info("TaskManagement: Started!");
+        if (this.taskTypeFactoryMap.size() == 0) {
+            LOG.warn("Not factories registered! Pending tasks will be queued once factories are registered!");
+            return;
+        }
 
         queuePendingTasks();
     }
diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
index 84aefc9..d440f2f 100644
--- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -131,9 +131,6 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
         List<String> ret = entityGraphMapper.propagateClassification(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
         assertNull(ret);
 
-        ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
-        assertNull(ret);
-
         ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY, StringUtils.EMPTY);
         assertNull(ret);
 
@@ -191,9 +188,6 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
 
         assertNotNull(entityVertex);
         assertNotNull(classificationVertex);
-
-        List<String> impactedEntities = entityGraphMapper.updateClassificationsPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString(), StringUtils.EMPTY);
-        assertNotNull(impactedEntities);
     }
 
     @Test(dependsOnMethods = "update")