You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/05/20 21:52:04 UTC
[atlas] branch master updated: ATLAS-4285: Multiple propagations
with intersecting lineage.
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new aba97b3 ATLAS-4285: Multiple propagations with intersecting lineage.
aba97b3 is described below
commit aba97b35393f1732eb30858e69fd5f489634afdc
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu May 13 17:28:26 2021 -0700
ATLAS-4285: Multiple propagations with intersecting lineage.
Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
.../store/graph/v2/EntityGraphMapper.java | 79 +++-------------------
.../tasks/ClassificationPropagateTaskFactory.java | 5 --
.../v2/tasks/ClassificationPropagationTasks.java | 15 ----
.../apache/atlas/tasks/TaskFactoryRegistry.java | 23 ++++++-
.../org/apache/atlas/tasks/TaskManagement.java | 11 +++
.../ClassificationPropagationWithTasksTest.java | 6 --
6 files changed, 43 insertions(+), 96 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 7984a34..5baff33 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -2041,10 +2041,7 @@ public class EntityGraphMapper {
return null;
}
- GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
-
AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
-
if (entityVertex == null) {
LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
@@ -2052,7 +2049,6 @@ public class EntityGraphMapper {
}
AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
-
if (classificationVertex == null) {
LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
@@ -2060,13 +2056,15 @@ public class EntityGraphMapper {
}
List<AtlasVertex> impactedVertices = entityRetriever.getIncludedImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId);
-
if (CollectionUtils.isEmpty(impactedVertices)) {
LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId);
return null;
}
+ List<String> impactedVerticesGuidsToLock = impactedVertices.stream().map(x -> GraphHelper.getGuid(x)).collect(Collectors.toList());
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedVerticesGuidsToLock);
+
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, impactedVertices);
@@ -2453,63 +2451,6 @@ public class EntityGraphMapper {
AtlasPerfTracer.log(perf);
}
- @GraphTransaction
- public List<String> updateClassificationsPropagation(String entityGuid, String classificationVertexId, String relationshipGuid) throws AtlasBaseException {
- try {
- if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
- LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);
- return null;
- }
-
- AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
-
- if (entityVertex == null) {
- LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
- return null;
- }
-
- AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
-
- if (classificationVertex == null) {
- LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
- return null;
- }
-
- List<AtlasVertex> entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertex.getIdForDisplay());
-
- if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
- LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no impacted vertices found!", entityGuid, classificationVertexId);
- return null;
- }
-
- List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo);
-
- if (CollectionUtils.isEmpty(entitiesPropagatedTo)) {
- LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no propagations added!", entityGuid, classificationVertexId);
- return null;
- }
-
- AtlasClassification updatedClassification = entityRetriever.toAtlasClassification(classificationVertex);
- List<String> ret = new ArrayList<>();
-
- for (AtlasVertex vertex : entitiesToPropagateTo) {
- AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
-
- ret.add(entity.getGuid());
-
- if (isActive(entity)) {
- vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
-
- entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(updatedClassification));
- }
- }
-
- return ret;
- } catch (Exception ex) {
- throw new AtlasBaseException(ex);
- }
- }
-
private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification,
AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
throws AtlasBaseException {
@@ -2566,11 +2507,8 @@ public class EntityGraphMapper {
return null;
}
- GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
-
AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
-
if (classificationVertex == null) {
LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex not found", classificationVertexId);
@@ -2578,14 +2516,14 @@ public class EntityGraphMapper {
}
List<AtlasVertex> entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
-
deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true);
-
if (CollectionUtils.isEmpty(entityVertices)) {
-
return null;
}
+ List<String> impactedGuids = entityVertices.stream().map(x -> GraphHelper.getGuid(x)).collect(Collectors.toList());
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids);
+
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entityVertices);
entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification));
@@ -2787,8 +2725,11 @@ public class EntityGraphMapper {
AtlasEntity entity = instanceConverter.getAndCacheEntity(graphHelper.getGuid(vertex), ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
if (isActive(entity)) {
- vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+ String classificationTextForEntity = fullTextMapperV2.getClassificationTextForEntity(entity);
+ vertex.setProperty(CLASSIFICATION_TEXT_KEY, classificationTextForEntity);
propagatedEntities.add(entity);
+
+ LOG.info("updateClassificationText: {}: {}", classification.getTypeName(), classificationTextForEntity);
}
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
index 8a81dc9..b3320c6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
@@ -37,13 +37,11 @@ public class ClassificationPropagateTaskFactory implements TaskFactory {
private static final Logger LOG = LoggerFactory.getLogger(ClassificationPropagateTaskFactory.class);
public static final String CLASSIFICATION_PROPAGATION_ADD = "CLASSIFICATION_PROPAGATION_ADD";
- public static final String CLASSIFICATION_PROPAGATION_UPDATE = "CLASSIFICATION_PROPAGATION_UPDATE";
public static final String CLASSIFICATION_PROPAGATION_DELETE = "CLASSIFICATION_PROPAGATION_DELETE";
public static final String CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE = "CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE";
private static final List<String> supportedTypes = new ArrayList<String>() {{
add(CLASSIFICATION_PROPAGATION_ADD);
- add(CLASSIFICATION_PROPAGATION_UPDATE);
add(CLASSIFICATION_PROPAGATION_DELETE);
add(CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE);
}};
@@ -69,9 +67,6 @@ public class ClassificationPropagateTaskFactory implements TaskFactory {
case CLASSIFICATION_PROPAGATION_ADD:
return new ClassificationPropagationTasks.Add(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
- case CLASSIFICATION_PROPAGATION_UPDATE:
- return new ClassificationPropagationTasks.Update(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
-
case CLASSIFICATION_PROPAGATION_DELETE:
return new ClassificationPropagationTasks.Delete(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
index f86cbc7..aec5b0c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
@@ -45,21 +45,6 @@ public class ClassificationPropagationTasks {
}
}
- public static class Update extends ClassificationTask {
- public Update(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
- super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
- }
-
- @Override
- protected void run(Map<String, Object> parameters) throws AtlasBaseException {
- String entityGuid = (String) parameters.get(PARAM_ENTITY_GUID);
- String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
- String relationshipGuid = (String) parameters.get(PARAM_RELATIONSHIP_GUID);
-
- entityGraphMapper.updateClassificationsPropagation(entityGuid, classificationVertexId, relationshipGuid);
- }
- }
-
public static class Delete extends ClassificationTask {
public Delete(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
index 38f2cc9..4c93f0e 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
@@ -17,10 +17,12 @@
*/
package org.apache.atlas.tasks;
+import org.apache.atlas.AtlasException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.util.Set;
@@ -28,12 +30,31 @@ import java.util.Set;
public class TaskFactoryRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskFactoryRegistry.class);
+ private final TaskManagement taskManagement;
+
@Inject
public TaskFactoryRegistry(TaskManagement taskManagement, Set<TaskFactory> factories) {
+ this.taskManagement = taskManagement;
for (TaskFactory factory : factories) {
taskManagement.addFactory(factory);
}
LOG.info("TaskFactoryRegistry: TaskManagement updated with factories: {}", factories.size());
}
-}
\ No newline at end of file
+
+ @PostConstruct
+ public void startTaskManagement() throws AtlasException {
+ try {
+ if (!taskManagement.hasStarted()) {
+ LOG.info("TaskFactoryRegistry: TaskManagement start skipped! Someone else will start it.");
+ return;
+ }
+
+ LOG.info("TaskFactoryRegistry: Starting TaskManagement...");
+ taskManagement.start();
+ } catch (AtlasException e) {
+ LOG.error("Error starting TaskManagement!", e);
+ throw e;
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
index 2756504..9a519ba 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
@@ -49,6 +49,7 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
private final TaskRegistry registry;
private final Statistics statistics;
private final Map<String, TaskFactory> taskTypeFactoryMap;
+ private boolean hasStarted;
@Inject
public TaskManagement(Configuration configuration, TaskRegistry taskRegistry) {
@@ -75,6 +76,12 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
} else {
LOG.info("TaskManagement.start(): deferring until instance activation");
}
+
+ this.hasStarted = true;
+ }
+
+ public boolean hasStarted() {
+ return this.hasStarted;
}
@Override
@@ -183,6 +190,10 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
}
LOG.info("TaskManagement: Started!");
+ if (this.taskTypeFactoryMap.size() == 0) {
+ LOG.warn("Not factories registered! Pending tasks will be queued once factories are registered!");
+ return;
+ }
queuePendingTasks();
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
index 84aefc9..d440f2f 100644
--- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -131,9 +131,6 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
List<String> ret = entityGraphMapper.propagateClassification(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
assertNull(ret);
- ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
- assertNull(ret);
-
ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY, StringUtils.EMPTY);
assertNull(ret);
@@ -191,9 +188,6 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
assertNotNull(entityVertex);
assertNotNull(classificationVertex);
-
- List<String> impactedEntities = entityGraphMapper.updateClassificationsPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString(), StringUtils.EMPTY);
- assertNotNull(impactedEntities);
}
@Test(dependsOnMethods = "update")