You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/04/27 05:46:43 UTC

[atlas] branch master updated: ATLAS-4256, ATLAS-4258: AtlasTasks - Elegant handling of Failover Scenarios

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new eddda69  ATLAS-4256, ATLAS-4258: AtlasTasks - Elegant handling of Failover Scenarios
eddda69 is described below

commit eddda699def908895f17384735bf90e08f02d6bf
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon Apr 26 22:02:02 2021 -0700

    ATLAS-4256, ATLAS-4258: AtlasTasks - Elegant handling of Failover Scenarios
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
 .../org/apache/atlas/repository/Constants.java     |  2 +-
 .../repository/graph/GraphBackedSearchIndexer.java |  1 +
 .../repository/store/graph/v1/DeleteHandlerV1.java |  2 +-
 .../store/graph/v2/AtlasGraphUtilsV2.java          | 26 ++++++++++++++
 .../store/graph/v2/EntityGraphMapper.java          | 42 ++++++++++++++++++++--
 .../store/graph/v2/EntityGraphRetriever.java       |  6 +++-
 .../tasks/ClassificationPropagateTaskFactory.java  |  4 +--
 .../v2/tasks/ClassificationPropagationTasks.java   |  3 +-
 .../store/graph/v2/tasks/ClassificationTask.java   | 26 ++++++++------
 .../apache/atlas/tasks/TaskFactoryRegistry.java    | 39 ++++++++++++++++++++
 .../org/apache/atlas/tasks/TaskManagement.java     |  6 ++--
 .../ClassificationPropagationWithTasksTest.java    |  4 +--
 .../apache/atlas/web/resources/AdminResource.java  |  5 ---
 13 files changed, 137 insertions(+), 29 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 4df38a5..ffcec97 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -94,7 +94,7 @@ public final class Constants {
     public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY  = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames");
     public static final String CUSTOM_ATTRIBUTES_PROPERTY_KEY       = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "customAttributes");
     public static final String LABELS_PROPERTY_KEY                  = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "labels");
-    public static final String PENDING_TASKS_PROPERTY_KEY           = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks");
+    public static final String EDGE_PENDING_TASKS_PROPERTY_KEY      = encodePropertyKey(RELATIONSHIP_PROPERTY_KEY_PREFIX + "__pendingTasks");
 
     /**
      * Patch vertices property keys.
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 276343e..cc727c6 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -65,6 +65,7 @@ import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isRef
 import static org.apache.atlas.type.AtlasStructType.UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX;
 import static org.apache.atlas.type.AtlasTypeUtil.isArrayType;
 import static org.apache.atlas.type.AtlasTypeUtil.isMapType;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
 
 
 /**
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 20d5e6f..f118ae6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -1203,7 +1203,7 @@ public abstract class DeleteHandlerV1 {
         Map<String, Object> taskParams         = ClassificationTask.toParameters(relationshipEdgeId, relationship);
         AtlasTask           task               = taskManagement.createTask(taskType, currentUser, taskParams);
 
-        AtlasGraphUtilsV2.addEncodedProperty(relationshipEdge, PENDING_TASKS_PROPERTY_KEY, task.getGuid());
+        AtlasGraphUtilsV2.addItemToListProperty(relationshipEdge, EDGE_PENDING_TASKS_PROPERTY_KEY, task.getGuid());
 
         RequestContext.get().queueTask(task);
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 8d4fdf3..0a94708 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -47,6 +47,7 @@ import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.util.FileUtils;
 import org.apache.atlas.utils.AtlasPerfMetrics;
 import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
@@ -841,4 +842,29 @@ public class AtlasGraphUtilsV2 {
         return ret;
     }
 
+    public static void addItemToListProperty(AtlasEdge edge, String property, String value) {
+        List list = getListFromProperty(edge, property);
+
+        list.add(value);
+
+        edge.setListProperty(property, list);
+    }
+
+    public static void removeItemFromListProperty(AtlasEdge edge, String property, String value) {
+        List list = getListFromProperty(edge, property);
+
+        list.remove(value);
+
+        if (CollectionUtils.isEmpty(list)) {
+            edge.removeProperty(property);
+        } else {
+            edge.setListProperty(property, list);
+        }
+    }
+
+    private static List getListFromProperty(AtlasEdge edge, String property) {
+        List list = edge.getListProperty(property);
+
+        return CollectionUtils.isEmpty(list) ? new ArrayList() : list;
+    }
 }
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index d8ef32b..2a71e34 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -25,7 +25,8 @@ import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.TimeBoundary;
+ import org.apache.atlas.exception.EntityNotFoundException;
+ import org.apache.atlas.model.TimeBoundary;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -124,6 +125,7 @@ import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPro
 import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
 
 @Component
 public class EntityGraphMapper {
@@ -2039,6 +2041,8 @@ public class EntityGraphMapper {
                 return null;
             }
 
+            GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
+
             AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
 
             if (entityVertex == null) {
@@ -2554,7 +2558,7 @@ public class EntityGraphMapper {
     }
 
     @GraphTransaction
-    public List<String> deleteClassificationPropagation(String classificationVertexId) throws AtlasBaseException {
+    public List<String> deleteClassificationPropagation(String entityGuid, String classificationVertexId) throws AtlasBaseException {
         try {
             if (StringUtils.isEmpty(classificationVertexId)) {
                 LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex id is empty", classificationVertexId);
@@ -2562,6 +2566,8 @@ public class EntityGraphMapper {
                 return null;
             }
 
+            GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
+
             AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
 
             if (classificationVertex == null) {
@@ -2822,4 +2828,36 @@ public class EntityGraphMapper {
     private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId) {
         deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null);
     }
+
+    public void removePendingTaskFromEntity(String entityGuid, String taskGuid) throws EntityNotFoundException {
+        if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(taskGuid)) {
+            return;
+        }
+
+        AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
+
+        if (entityVertex == null) {
+            LOG.warn("Error fetching vertex: {}", entityVertex);
+
+            return;
+        }
+
+        entityVertex.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, taskGuid);
+    }
+
+    public void removePendingTaskFromEdge(String edgeId, String taskGuid) throws AtlasBaseException {
+        if (StringUtils.isEmpty(edgeId) || StringUtils.isEmpty(taskGuid)) {
+            return;
+        }
+
+        AtlasEdge edge = graph.getEdge(edgeId);
+
+        if (edge == null) {
+            LOG.warn("Error fetching edge: {}", edgeId);
+
+            return;
+        }
+
+        AtlasGraphUtilsV2.removeItemFromListProperty(edge, EDGE_PENDING_TASKS_PROPERTY_KEY, taskGuid);
+    }
 }
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index b6f1ef7..2e0f39a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -1189,7 +1189,7 @@ public class EntityGraphRetriever {
                 continue;
             }
 
-            if (ignoreInactive && GraphHelper.getStatus((AtlasEdge) element) != AtlasEntity.Status.ACTIVE) {
+            if (isInactiveEdge(element, ignoreInactive)) {
                 continue;
             }
 
@@ -1710,4 +1710,8 @@ public class EntityGraphRetriever {
 
         return new HashSet<>(ret);
     }
+
+    private boolean isInactiveEdge(Object element, boolean ignoreInactive) {
+        return ignoreInactive && element instanceof AtlasEdge && getStatus((AtlasEdge) element) != AtlasEntity.Status.ACTIVE;
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
index 6244b2d..8a81dc9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
@@ -54,13 +54,11 @@ public class ClassificationPropagateTaskFactory implements TaskFactory {
     private final AtlasRelationshipStore relationshipStore;
 
     @Inject
-    public ClassificationPropagateTaskFactory(TaskManagement taskManagement, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+    public ClassificationPropagateTaskFactory(AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
         this.graph             = graph;
         this.entityGraphMapper = entityGraphMapper;
         this.deleteDelegate    = deleteDelegate;
         this.relationshipStore = relationshipStore;
-
-        taskManagement.addFactory(this);
     }
 
     public org.apache.atlas.tasks.AbstractTask create(AtlasTask task) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
index 4fda34a..f86cbc7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
@@ -67,9 +67,10 @@ public class ClassificationPropagationTasks {
 
         @Override
         protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+            String entityGuid             = (String) parameters.get(PARAM_ENTITY_GUID);
             String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
 
-            entityGraphMapper.deleteClassificationPropagation(classificationVertexId);
+            entityGraphMapper.deleteClassificationPropagation(entityGuid, classificationVertexId);
         }
     }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
index 369db08..00c9caa 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
@@ -19,8 +19,10 @@ package org.apache.atlas.repository.store.graph.v2.tasks;
 
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.exception.EntityNotFoundException;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasElement;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
@@ -56,7 +58,11 @@ public abstract class ClassificationTask extends AbstractTask {
     protected final DeleteHandlerDelegate  deleteDelegate;
     protected final AtlasRelationshipStore relationshipStore;
 
-    public ClassificationTask(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+    public ClassificationTask(AtlasTask task,
+                              AtlasGraph graph,
+                              EntityGraphMapper entityGraphMapper,
+                              DeleteHandlerDelegate deleteDelegate,
+                              AtlasRelationshipStore relationshipStore) {
         super(task);
 
         this.graph             = graph;
@@ -120,17 +126,15 @@ public abstract class ClassificationTask extends AbstractTask {
     protected void setStatus(AtlasTask.Status status) {
         super.setStatus(status);
 
-        // remove pending task guid from entity vertex or relationship edge
-        AtlasElement element;
-
-        if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) {
-            element = graph.getEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID));
-
-        } else {
-            element = AtlasGraphUtilsV2.findByGuid((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID));
+        try {
+            if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) {
+                entityGraphMapper.removePendingTaskFromEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID), getTaskGuid());
+            } else {
+                entityGraphMapper.removePendingTaskFromEntity((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID), getTaskGuid());
+            }
+        } catch (EntityNotFoundException | AtlasBaseException e) {
+            LOG.error("Error updating associated element for: {}", getTaskGuid(), e);
         }
-
-        element.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, getTaskGuid());
     }
 
     protected abstract void run(Map<String, Object> parameters) throws AtlasBaseException;
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
new file mode 100644
index 0000000..38f2cc9
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+@Component
+public class TaskFactoryRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(TaskFactoryRegistry.class);
+
+    @Inject
+    public TaskFactoryRegistry(TaskManagement taskManagement, Set<TaskFactory> factories) {
+        for (TaskFactory factory : factories) {
+            taskManagement.addFactory(factory);
+        }
+
+        LOG.info("TaskFactoryRegistry: TaskManagement updated with factories: {}", factories.size());
+    }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
index 264aa8c..2756504 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
@@ -73,7 +73,7 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
         if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) {
             startInternal();
         } else {
-            LOG.info("TaskManagement.start(): deferring patches until instance activation");
+            LOG.info("TaskManagement.start(): deferring until instance activation");
         }
     }
 
@@ -183,9 +183,11 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
         }
 
         LOG.info("TaskManagement: Started!");
+
+        queuePendingTasks();
     }
 
-    public void queuePendingTasks() {
+    private void queuePendingTasks() {
         if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) {
             return;
         }
diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
index e309a76..84aefc9 100644
--- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -134,7 +134,7 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
         ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
         assertNull(ret);
 
-        ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY);
+        ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY, StringUtils.EMPTY);
         assertNull(ret);
 
         AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
@@ -215,7 +215,7 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
         assertNotNull(entityVertex);
         assertNotNull(classificationVertex);
 
-        List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(classificationVertex.getId().toString());
+        List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString());
         assertNotNull(impactedEntities);
     }
 
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 714b400..a9fa8ba 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -211,11 +211,6 @@ public class AdminResource {
         }
     }
 
-    @PostConstruct
-    public void init() {
-        taskManagement.queuePendingTasks();
-    }
-
     /**
      * Fetches the thread stack dump for this application.
      *