You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/04/27 05:47:07 UTC
[atlas] branch branch-2.0 updated: ATLAS-4256,
ATLAS-4258: AtlasTasks - Elegant handling of Failover Scenarios
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new af1990c ATLAS-4256, ATLAS-4258: AtlasTasks - Elegant handling of Failover Scenarios
af1990c is described below
commit af1990cfa5e31858798e85ee9425c9b0247f0bb1
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon Apr 26 22:02:02 2021 -0700
ATLAS-4256, ATLAS-4258: AtlasTasks - Elegant handling of Failover Scenarios
Signed-off-by: Sarath Subramanian <sa...@apache.org>
(cherry picked from commit eddda699def908895f17384735bf90e08f02d6bf)
---
.../org/apache/atlas/repository/Constants.java | 2 +-
.../repository/graph/GraphBackedSearchIndexer.java | 1 +
.../repository/store/graph/v1/DeleteHandlerV1.java | 2 +-
.../store/graph/v2/AtlasGraphUtilsV2.java | 26 ++++++++++++++
.../store/graph/v2/EntityGraphMapper.java | 42 ++++++++++++++++++++--
.../store/graph/v2/EntityGraphRetriever.java | 6 +++-
.../tasks/ClassificationPropagateTaskFactory.java | 4 +--
.../v2/tasks/ClassificationPropagationTasks.java | 3 +-
.../store/graph/v2/tasks/ClassificationTask.java | 26 ++++++++------
.../apache/atlas/tasks/TaskFactoryRegistry.java | 39 ++++++++++++++++++++
.../org/apache/atlas/tasks/TaskManagement.java | 6 ++--
.../ClassificationPropagationWithTasksTest.java | 4 +--
.../apache/atlas/web/resources/AdminResource.java | 5 ---
13 files changed, 137 insertions(+), 29 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 4df38a5..ffcec97 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -94,7 +94,7 @@ public final class Constants {
public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames");
public static final String CUSTOM_ATTRIBUTES_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "customAttributes");
public static final String LABELS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "labels");
- public static final String PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks");
+ public static final String EDGE_PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(RELATIONSHIP_PROPERTY_KEY_PREFIX + "__pendingTasks");
/**
* Patch vertices property keys.
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 47c8b14..c73449e 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -65,6 +65,7 @@ import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isRef
import static org.apache.atlas.type.AtlasStructType.UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX;
import static org.apache.atlas.type.AtlasTypeUtil.isArrayType;
import static org.apache.atlas.type.AtlasTypeUtil.isMapType;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
/**
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 20d5e6f..f118ae6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -1203,7 +1203,7 @@ public abstract class DeleteHandlerV1 {
Map<String, Object> taskParams = ClassificationTask.toParameters(relationshipEdgeId, relationship);
AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams);
- AtlasGraphUtilsV2.addEncodedProperty(relationshipEdge, PENDING_TASKS_PROPERTY_KEY, task.getGuid());
+ AtlasGraphUtilsV2.addItemToListProperty(relationshipEdge, EDGE_PENDING_TASKS_PROPERTY_KEY, task.getGuid());
RequestContext.get().queueTask(task);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 8d4fdf3..0a94708 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -47,6 +47,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
@@ -841,4 +842,29 @@ public class AtlasGraphUtilsV2 {
return ret;
}
+ public static void addItemToListProperty(AtlasEdge edge, String property, String value) {
+ List list = getListFromProperty(edge, property);
+
+ list.add(value);
+
+ edge.setListProperty(property, list);
+ }
+
+ public static void removeItemFromListProperty(AtlasEdge edge, String property, String value) {
+ List list = getListFromProperty(edge, property);
+
+ list.remove(value);
+
+ if (CollectionUtils.isEmpty(list)) {
+ edge.removeProperty(property);
+ } else {
+ edge.setListProperty(property, list);
+ }
+ }
+
+ private static List getListFromProperty(AtlasEdge edge, String property) {
+ List list = edge.getListProperty(property);
+
+ return CollectionUtils.isEmpty(list) ? new ArrayList() : list;
+ }
}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index d8ef32b..2a71e34 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -25,7 +25,8 @@ import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.TimeBoundary;
+ import org.apache.atlas.exception.EntityNotFoundException;
+ import org.apache.atlas.model.TimeBoundary;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -124,6 +125,7 @@ import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPro
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
@Component
public class EntityGraphMapper {
@@ -2039,6 +2041,8 @@ public class EntityGraphMapper {
return null;
}
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
+
AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
if (entityVertex == null) {
@@ -2554,7 +2558,7 @@ public class EntityGraphMapper {
}
@GraphTransaction
- public List<String> deleteClassificationPropagation(String classificationVertexId) throws AtlasBaseException {
+ public List<String> deleteClassificationPropagation(String entityGuid, String classificationVertexId) throws AtlasBaseException {
try {
if (StringUtils.isEmpty(classificationVertexId)) {
LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex id is empty", classificationVertexId);
@@ -2562,6 +2566,8 @@ public class EntityGraphMapper {
return null;
}
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
+
AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
if (classificationVertex == null) {
@@ -2822,4 +2828,36 @@ public class EntityGraphMapper {
private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId) {
deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null);
}
+
+ public void removePendingTaskFromEntity(String entityGuid, String taskGuid) throws EntityNotFoundException {
+ if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(taskGuid)) {
+ return;
+ }
+
+ AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
+
+ if (entityVertex == null) {
+ LOG.warn("Error fetching vertex: {}", entityVertex);
+
+ return;
+ }
+
+ entityVertex.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, taskGuid);
+ }
+
+ public void removePendingTaskFromEdge(String edgeId, String taskGuid) throws AtlasBaseException {
+ if (StringUtils.isEmpty(edgeId) || StringUtils.isEmpty(taskGuid)) {
+ return;
+ }
+
+ AtlasEdge edge = graph.getEdge(edgeId);
+
+ if (edge == null) {
+ LOG.warn("Error fetching edge: {}", edgeId);
+
+ return;
+ }
+
+ AtlasGraphUtilsV2.removeItemFromListProperty(edge, EDGE_PENDING_TASKS_PROPERTY_KEY, taskGuid);
+ }
}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index b6f1ef7..2e0f39a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -1189,7 +1189,7 @@ public class EntityGraphRetriever {
continue;
}
- if (ignoreInactive && GraphHelper.getStatus((AtlasEdge) element) != AtlasEntity.Status.ACTIVE) {
+ if (isInactiveEdge(element, ignoreInactive)) {
continue;
}
@@ -1710,4 +1710,8 @@ public class EntityGraphRetriever {
return new HashSet<>(ret);
}
+
+ private boolean isInactiveEdge(Object element, boolean ignoreInactive) {
+ return ignoreInactive && element instanceof AtlasEdge && getStatus((AtlasEdge) element) != AtlasEntity.Status.ACTIVE;
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
index 6244b2d..8a81dc9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
@@ -54,13 +54,11 @@ public class ClassificationPropagateTaskFactory implements TaskFactory {
private final AtlasRelationshipStore relationshipStore;
@Inject
- public ClassificationPropagateTaskFactory(TaskManagement taskManagement, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+ public ClassificationPropagateTaskFactory(AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
this.graph = graph;
this.entityGraphMapper = entityGraphMapper;
this.deleteDelegate = deleteDelegate;
this.relationshipStore = relationshipStore;
-
- taskManagement.addFactory(this);
}
public org.apache.atlas.tasks.AbstractTask create(AtlasTask task) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
index 4fda34a..f86cbc7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
@@ -67,9 +67,10 @@ public class ClassificationPropagationTasks {
@Override
protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+ String entityGuid = (String) parameters.get(PARAM_ENTITY_GUID);
String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
- entityGraphMapper.deleteClassificationPropagation(classificationVertexId);
+ entityGraphMapper.deleteClassificationPropagation(entityGuid, classificationVertexId);
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
index 369db08..00c9caa 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
@@ -19,8 +19,10 @@ package org.apache.atlas.repository.store.graph.v2.tasks;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.exception.EntityNotFoundException;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
@@ -56,7 +58,11 @@ public abstract class ClassificationTask extends AbstractTask {
protected final DeleteHandlerDelegate deleteDelegate;
protected final AtlasRelationshipStore relationshipStore;
- public ClassificationTask(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+ public ClassificationTask(AtlasTask task,
+ AtlasGraph graph,
+ EntityGraphMapper entityGraphMapper,
+ DeleteHandlerDelegate deleteDelegate,
+ AtlasRelationshipStore relationshipStore) {
super(task);
this.graph = graph;
@@ -120,17 +126,15 @@ public abstract class ClassificationTask extends AbstractTask {
protected void setStatus(AtlasTask.Status status) {
super.setStatus(status);
- // remove pending task guid from entity vertex or relationship edge
- AtlasElement element;
-
- if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) {
- element = graph.getEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID));
-
- } else {
- element = AtlasGraphUtilsV2.findByGuid((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID));
+ try {
+ if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) {
+ entityGraphMapper.removePendingTaskFromEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID), getTaskGuid());
+ } else {
+ entityGraphMapper.removePendingTaskFromEntity((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID), getTaskGuid());
+ }
+ } catch (EntityNotFoundException | AtlasBaseException e) {
+ LOG.error("Error updating associated element for: {}", getTaskGuid(), e);
}
-
- element.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, getTaskGuid());
}
protected abstract void run(Map<String, Object> parameters) throws AtlasBaseException;
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
new file mode 100644
index 0000000..38f2cc9
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+@Component
+public class TaskFactoryRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskFactoryRegistry.class);
+
+ @Inject
+ public TaskFactoryRegistry(TaskManagement taskManagement, Set<TaskFactory> factories) {
+ for (TaskFactory factory : factories) {
+ taskManagement.addFactory(factory);
+ }
+
+ LOG.info("TaskFactoryRegistry: TaskManagement updated with factories: {}", factories.size());
+ }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
index 264aa8c..2756504 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
@@ -73,7 +73,7 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) {
startInternal();
} else {
- LOG.info("TaskManagement.start(): deferring patches until instance activation");
+ LOG.info("TaskManagement.start(): deferring until instance activation");
}
}
@@ -183,9 +183,11 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
}
LOG.info("TaskManagement: Started!");
+
+ queuePendingTasks();
}
- public void queuePendingTasks() {
+ private void queuePendingTasks() {
if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) {
return;
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
index e309a76..84aefc9 100644
--- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -134,7 +134,7 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
assertNull(ret);
- ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY);
+ ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY, StringUtils.EMPTY);
assertNull(ret);
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
@@ -215,7 +215,7 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
assertNotNull(entityVertex);
assertNotNull(classificationVertex);
- List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(classificationVertex.getId().toString());
+ List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString());
assertNotNull(impactedEntities);
}
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 714b400..a9fa8ba 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -211,11 +211,6 @@ public class AdminResource {
}
}
- @PostConstruct
- public void init() {
- taskManagement.queuePendingTasks();
- }
-
/**
* Fetches the thread stack dump for this application.
*