You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/04/14 23:50:55 UTC
[atlas] branch branch-2.0 updated: ATLAS-3919: Handling
classification propagation as deferred-action
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 6eb970c ATLAS-3919: Handling classification propagation as deferred-action
6eb970c is described below
commit 6eb970ce23d4ce290687512d3871db8f086a9e4f
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Apr 14 16:49:24 2021 -0700
ATLAS-3919: Handling classification propagation as deferred-action
Co-authored-by: Jayendra Parab <ja...@gmail.com>
Signed-off-by: Sarath Subramanian <sa...@apache.org>
(cherry picked from commit 0751f16250220271ce63ab7523de37e1f85defda)
---
.../org/apache/atlas/repository/Constants.java | 19 ++
distro/src/conf/atlas-log4j.xml | 15 ++
.../java/org/apache/atlas/AtlasConfiguration.java | 3 +-
.../apache/atlas/model/instance/AtlasEntity.java | 14 +
.../org/apache/atlas/model/tasks/AtlasTask.java | 192 ++++++++++++++
.../org/apache/atlas/type/AtlasEntityType.java | 1 +
.../org/apache/atlas/type/AtlasTypeRegistry.java | 2 +-
.../main/java/org/apache/atlas/type/Constants.java | 1 +
.../test/resources/atlas-application.properties | 3 +
.../apache/atlas/GraphTransactionInterceptor.java | 24 +-
.../repository/graph/GraphBackedSearchIndexer.java | 7 +
.../store/graph/v1/DeleteHandlerDelegate.java | 16 +-
.../repository/store/graph/v1/DeleteHandlerV1.java | 284 +++++++++++++++------
.../store/graph/v1/HardDeleteHandlerV1.java | 5 +-
.../store/graph/v1/SoftDeleteHandlerV1.java | 5 +-
.../store/graph/v2/AtlasGraphUtilsV2.java | 16 ++
.../store/graph/v2/AtlasRelationshipStoreV2.java | 159 +-----------
.../store/graph/v2/EntityGraphMapper.java | 236 ++++++++++++++++-
.../store/graph/v2/EntityGraphRetriever.java | 93 +++++--
.../store/graph/v2/bulkimport/MigrationImport.java | 4 +-
.../tasks/ClassificationPropagateTaskFactory.java | 93 +++++++
.../v2/tasks/ClassificationPropagationTasks.java | 89 +++++++
.../store/graph/v2/tasks/ClassificationTask.java | 137 ++++++++++
.../java/org/apache/atlas/tasks/AbstractTask.java | 68 +++++
.../java/org/apache/atlas/tasks/TaskExecutor.java | 181 +++++++++++++
.../java/org/apache/atlas/tasks/TaskFactory.java | 28 +-
.../org/apache/atlas/tasks/TaskManagement.java | 280 ++++++++++++++++++++
.../java/org/apache/atlas/tasks/TaskRegistry.java | 235 +++++++++++++++++
.../main/java/org/apache/atlas/util}/BeanUtil.java | 2 +-
.../test/java/org/apache/atlas/TestModules.java | 8 +-
.../atlas/discovery/AtlasDiscoveryServiceTest.java | 2 +-
.../ClassificationPropagationWithTasksTest.java | 227 ++++++++++++++++
.../org/apache/atlas/tasks/BaseTaskFixture.java | 116 +++++++++
.../org/apache/atlas/tasks/TaskExecutorTest.java | 123 +++++++++
.../org/apache/atlas/tasks/TaskManagementTest.java | 109 ++++++++
.../org/apache/atlas/tasks/TaskRegistryTest.java | 99 +++++++
.../main/java/org/apache/atlas/RequestContext.java | 20 +-
.../atlas/listener/ActiveStateChangeHandler.java | 4 +-
test-tools/pom.xml | 4 -
test-tools/src/main/resources/log4j.properties | 9 +
.../resources/solr/core-template/solrconfig.xml | 30 +--
.../apache/atlas/web/resources/AdminResource.java | 31 ++-
.../apache/atlas/web/service/EmbeddedServer.java | 6 +-
.../atlas/web/resources/AdminResourceTest.java | 4 +-
.../test/resources/atlas-application.properties | 3 +
webapp/src/test/resources/template_metadata.csv | 2 +-
46 files changed, 2663 insertions(+), 346 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 771287f..4df38a5 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -94,6 +94,7 @@ public final class Constants {
public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames");
public static final String CUSTOM_ATTRIBUTES_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "customAttributes");
public static final String LABELS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "labels");
+ public static final String PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks");
/**
* Patch vertices property keys.
@@ -208,6 +209,24 @@ public final class Constants {
public static final String TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE = "ADD_MANDATORY_ATTRIBUTE";
/*
+ * Task related constants
+ */
+ public static final String TASK_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "task_";
+ public static final String TASK_TYPE_PROPERTY_KEY = encodePropertyKey(TASK_PREFIX + "v_type");
+ public static final String TASK_TYPE_NAME = INTERNAL_PROPERTY_KEY_PREFIX + "AtlasTaskDef";
+ public static final String TASK_GUID = encodePropertyKey(TASK_PREFIX + "guid");
+ public static final String TASK_TYPE = encodePropertyKey(TASK_PREFIX + "type");
+ public static final String TASK_CREATED_TIME = encodePropertyKey(TASK_PREFIX + "timestamp");
+ public static final String TASK_UPDATED_TIME = encodePropertyKey(TASK_PREFIX + "modificationTimestamp");
+ public static final String TASK_CREATED_BY = encodePropertyKey(TASK_PREFIX + "createdBy");
+ public static final String TASK_STATUS = encodePropertyKey(TASK_PREFIX + "status");
+ public static final String TASK_ATTEMPT_COUNT = encodePropertyKey(TASK_PREFIX + "attemptCount");
+ public static final String TASK_PARAMETERS = encodePropertyKey(TASK_PREFIX + "parameters");
+ public static final String TASK_ERROR_MESSAGE = encodePropertyKey(TASK_PREFIX + "errorMessage");
+ public static final String TASK_START_TIME = encodePropertyKey(TASK_PREFIX + "startTime");
+ public static final String TASK_END_TIME = encodePropertyKey(TASK_PREFIX + "endTime");
+
+ /*
* All supported file-format extensions for Bulk Imports through file upload
*/
public enum SupportedFileExtensions { XLSX, XLS, CSV }
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index 7df963e..90e1301 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -57,6 +57,16 @@
</layout>
</appender>
+ <appender name="TASKS" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${atlas.log.dir}/tasks.log"/>
+ <param name="Append" value="true"/>
+ <param name="maxFileSize" value="259MB" />
+ <param name="maxBackupIndex" value="20" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%t %d %x %m%n (%C{1}:%L)"/>
+ </layout>
+ </appender>
+
<appender name="METRICS" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/metric.log"/>
<param name="Append" value="true"/>
@@ -144,6 +154,11 @@
<appender-ref ref="FAILED"/>
</logger>
+ <logger name="TASKS" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="TASKS"/>
+ </logger>
+
<root>
<priority value="warn"/>
<appender-ref ref="FILE"/>
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 08d6c9d..05b0784 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -76,7 +76,8 @@ public enum AtlasConfiguration {
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true),
REBUILD_INDEX("atlas.rebuild.index", false),
STORE_DIFFERENTIAL_AUDITS("atlas.entity.audit.differential", false),
- DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true);
+ DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true),
+ TASKS_USE_ENABLED("atlas.tasks.enabled", true);
private static final Configuration APPLICATION_PROPERTIES;
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 4d8c948..5f8c6e8 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -95,6 +95,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
private Map<String, String> customAttributes;
private Map<String, Map<String, Object>> businessAttributes;
private Set<String> labels;
+ private Set<String> pendingTasks; // read-only field i.e. value provided is ignored during entity create/update
@JsonIgnore
private static AtomicLong s_nextId = new AtomicLong(System.nanoTime());
@@ -220,6 +221,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
setCustomAttributes(other.getCustomAttributes());
setBusinessAttributes(other.getBusinessAttributes());
setLabels(other.getLabels());
+ setPendingTasks(other.getPendingTasks());
}
}
@@ -393,6 +395,14 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
this.labels = labels;
}
+ public Set<String> getPendingTasks() {
+ return pendingTasks;
+ }
+
+ public void setPendingTasks(Set<String> pendingTasks) {
+ this.pendingTasks = pendingTasks;
+ }
+
public List<AtlasClassification> getClassifications() { return classifications; }
public void setClassifications(List<AtlasClassification> classifications) { this.classifications = classifications; }
@@ -443,6 +453,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
setCustomAttributes(null);
setBusinessAttributes(null);
setLabels(null);
+ setPendingTasks(null);
}
private static String nextInternalId() {
@@ -486,6 +497,9 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
sb.append(", labels=[");
dumpObjects(labels, sb);
sb.append("]");
+ sb.append(", pendingTasks=[");
+ dumpObjects(pendingTasks, sb);
+ sb.append("]");
sb.append('}');
return sb;
diff --git a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java
new file mode 100644
index 0000000..7918f0b
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.tasks;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtlasTask {
+ @JsonIgnore
+ public static final int MAX_ATTEMPT_COUNT = 3;
+
+ public enum Status {
+ PENDING,
+ IN_PROGRESS,
+ COMPLETE,
+ FAILED;
+ }
+
+ private String type;
+ private String guid;
+ private String createdBy;
+ private Date createdTime;
+ private Date updatedTime;
+ private Date startTime;
+ private Date endTime;
+ private Map<String, Object> parameters;
+ private int attemptCount;
+ private String errorMessage;
+ private Status status;
+
+ public AtlasTask() {
+ }
+
+ public AtlasTask(String type, String createdBy, Map<String, Object> parameters) {
+ this.guid = UUID.randomUUID().toString();
+ this.type = type;
+ this.createdBy = createdBy;
+ this.createdTime = new Date();
+ this.updatedTime = this.createdTime;
+ this.parameters = parameters;
+ this.status = Status.PENDING;
+ this.attemptCount = 0;
+ }
+
+ public String getGuid() {
+ return guid;
+ }
+
+ public void setGuid(String guid) {
+ this.guid = guid;
+ }
+
+ public String getCreatedBy() {
+ return createdBy;
+ }
+
+ public void setCreatedBy(String createdBy) {
+ this.createdBy = createdBy;
+ }
+
+ public Date getCreatedTime() {
+ return createdTime;
+ }
+
+ public void setCreatedTime(Date createdTime) {
+ this.createdTime = createdTime;
+ }
+
+ public Date getUpdatedTime() {
+ return updatedTime;
+ }
+
+ public void setUpdatedTime(Date updatedTime) {
+ this.updatedTime = updatedTime;
+ }
+
+ public Map<String, Object> getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(Map<String, Object> val) {
+ this.parameters = val;
+ }
+
+ public void setType(String val) {
+ this.type = val;
+ }
+
+ public String getType() {
+ return this.type;
+ }
+
+ public void setStatus(String val) {
+ if (StringUtils.isNotEmpty(val)) {
+ this.status = Status.valueOf(val);
+ }
+ }
+
+ public void setStatus(Status val) {
+ this.status = val;
+ }
+
+ public Status getStatus() {
+ return this.status;
+ }
+
+ public int getAttemptCount() {
+ return attemptCount;
+ }
+
+ public void setAttemptCount(int attemptCount) {
+ this.attemptCount = attemptCount;
+ }
+
+ public void incrementAttemptCount() {
+ this.attemptCount++;
+ }
+
+ public void setStatusPending() {
+ this.status = Status.PENDING;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ public Date getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Date endTime) {
+ this.endTime = endTime;
+ }
+
+ @JsonIgnore
+ public void start() {
+ this.setStatus(Status.IN_PROGRESS);
+ this.setStartTime(new Date());
+ }
+
+ @JsonIgnore
+ public void end() {
+ this.status = Status.COMPLETE;
+ this.setEndTime(new Date());
+ }
+
+ @JsonIgnore
+ public void updateStatusFromAttemptCount() {
+ setStatus((attemptCount < MAX_ATTEMPT_COUNT) ? AtlasTask.Status.PENDING : AtlasTask.Status.FAILED);
+ }
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 27c7f73..bfd5e98 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -1291,6 +1291,7 @@ public class AtlasEntityType extends AtlasStructType {
add(new AtlasAttributeDef(IS_INCOMPLETE_PROPERTY_KEY, ATLAS_TYPE_INT, false, true));
add(new AtlasAttributeDef(LABELS_PROPERTY_KEY, ATLAS_TYPE_STRING, false, true));
add(new AtlasAttributeDef(CUSTOM_ATTRIBUTES_PROPERTY_KEY, ATLAS_TYPE_STRING, false, true));
+ add(new AtlasAttributeDef(PENDING_TASKS_PROPERTY_KEY, ATLAS_TYPE_STRING, false, true));
}};
return new AtlasEntityDef(ENTITY_ROOT_NAME, "Root entity for system attributes", "1.0", attributeDefs);
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
index 4c7f8c6..0c05bb3 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
@@ -59,7 +59,7 @@ public class AtlasTypeRegistry {
registryData = new RegistryData();
updateSynchronizer = new TypeRegistryUpdateSynchronizer(this);
missingRelationshipDefs = new HashSet<>();
- commonIndexFieldNameCache = new HashMap<>();
+ commonIndexFieldNameCache = new LinkedHashMap<>();
resolveReferencesForRootTypes();
resolveIndexFieldNamesForRootTypes();
diff --git a/intg/src/main/java/org/apache/atlas/type/Constants.java b/intg/src/main/java/org/apache/atlas/type/Constants.java
index 7ee8520..14c9c7c 100644
--- a/intg/src/main/java/org/apache/atlas/type/Constants.java
+++ b/intg/src/main/java/org/apache/atlas/type/Constants.java
@@ -47,6 +47,7 @@ public final class Constants {
public static final String CLASSIFICATION_NAMES_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "classificationNames");
public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames");
public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
+ public static final String PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks");
//Classification-Only System Attributes
public static final String CLASSIFICATION_ENTITY_STATUS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityStatus");
diff --git a/intg/src/test/resources/atlas-application.properties b/intg/src/test/resources/atlas-application.properties
index 7e74d51..50ce01e 100644
--- a/intg/src/test/resources/atlas-application.properties
+++ b/intg/src/test/resources/atlas-application.properties
@@ -144,3 +144,6 @@ atlas.authentication.method.kerberos=false
######### Gremlin Search Configuration #########
# Set to false to disable gremlin search.
atlas.search.gremlin.enable=true
+
+######### Configure use of Tasks #########
+atlas.tasks.enabled=false
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index 86b369f..343d00d 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -26,7 +26,9 @@ import org.apache.atlas.exception.NotFoundException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
+import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -53,7 +55,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
private static final ThreadLocal<Boolean> innerFailure = ThreadLocal.withInitial(() -> Boolean.FALSE);
private static final ThreadLocal<Map<String, AtlasVertex>> guidVertexCache = ThreadLocal.withInitial(() -> new HashMap<>());
- private final AtlasGraph graph;
+ private final AtlasGraph graph;
+ private final TaskManagement taskManagement;
private static final ThreadLocal<Map<Object, String>> vertexGuidCache =
new ThreadLocal<Map<Object, String>>() {
@@ -80,8 +83,9 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
};
@Inject
- public GraphTransactionInterceptor(AtlasGraph graph) {
- this.graph = graph;
+ public GraphTransactionInterceptor(AtlasGraph graph, TaskManagement taskManagement) {
+ this.graph = graph;
+ this.taskManagement = taskManagement;
}
@Override
@@ -89,7 +93,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
Method method = invocation.getMethod();
String invokingClass = method.getDeclaringClass().getSimpleName();
String invokedMethodName = method.getName();
- boolean logRollback = method.getAnnotation(GraphTransaction.class).logRollback();
+ boolean logRollback = method.getAnnotation(GraphTransaction.class) == null || method.getAnnotation(GraphTransaction.class).logRollback();
boolean isInnerTxn = isTxnOpen.get();
// Outermost txn marks any subsequent transaction as inner
@@ -164,6 +168,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
OBJECT_UPDATE_SYNCHRONIZER.releaseLockedObjects();
+
+ if (isSuccess) {
+ submitTasks();
+ }
}
}
@@ -231,6 +239,14 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
edgeStateCache.get().clear();
}
+ private void submitTasks() {
+ if (CollectionUtils.isEmpty(RequestContext.get().getQueuedTasks()) || taskManagement == null) {
+ return;
+ }
+
+ taskManagement.addAll(RequestContext.get().getQueuedTasks());
+ }
+
boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index ca03492..47c8b14 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -334,6 +334,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createCommonVertexIndex(management, PROPAGATED_CLASSIFICATION_NAMES_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, true);
createCommonVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, LIST, true, true);
+ createCommonVertexIndex(management, PENDING_TASKS_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, false);
createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, UniqueKind.NONE, Integer.class, SINGLE, true, true);
createCommonVertexIndex(management, CUSTOM_ATTRIBUTES_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, LABELS_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
@@ -344,6 +345,12 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createCommonVertexIndex(management, PATCH_ACTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_STATE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
+ // tasks
+ createCommonVertexIndex(management, TASK_GUID, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
+ createCommonVertexIndex(management, TASK_TYPE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
+ createCommonVertexIndex(management, TASK_CREATED_TIME, UniqueKind.NONE, Long.class, SINGLE, true, false);
+ createCommonVertexIndex(management, TASK_STATUS, UniqueKind.NONE, String.class, SINGLE, true, false);
+
// create vertex-centric index
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerDelegate.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerDelegate.java
index 1aaea64..0c5ece0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerDelegate.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerDelegate.java
@@ -21,6 +21,7 @@ package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.RequestContext;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.DeleteType;
+import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.slf4j.Logger;
@@ -38,13 +39,15 @@ public class DeleteHandlerDelegate {
private final SoftDeleteHandlerV1 softDeleteHandler;
private final HardDeleteHandlerV1 hardDeleteHandler;
private final DeleteHandlerV1 defaultHandler;
- private final AtlasGraph atlasGraph;
+ private final AtlasGraph graph;
+ private final TaskManagement taskManagement;
@Inject
- public DeleteHandlerDelegate(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
- this.atlasGraph = atlasGraph;
- this.softDeleteHandler = new SoftDeleteHandlerV1(atlasGraph, typeRegistry);
- this.hardDeleteHandler = new HardDeleteHandlerV1(atlasGraph, typeRegistry);
+ public DeleteHandlerDelegate(AtlasGraph graph, AtlasTypeRegistry typeRegistry, TaskManagement taskManagement) {
+ this.graph = graph;
+ this.taskManagement = taskManagement;
+ this.softDeleteHandler = new SoftDeleteHandlerV1(graph, typeRegistry, taskManagement);
+ this.hardDeleteHandler = new HardDeleteHandlerV1(graph, typeRegistry, taskManagement);
this.defaultHandler = getDefaultConfiguredHandler(typeRegistry);
}
@@ -77,7 +80,8 @@ public class DeleteHandlerDelegate {
LOG.info("Default delete handler set to: {}", handlerFromProperties.getName());
- ret = (DeleteHandlerV1) handlerFromProperties.getConstructor(AtlasGraph.class, AtlasTypeRegistry.class).newInstance(this.atlasGraph, typeRegistry);
+ ret = (DeleteHandlerV1) handlerFromProperties.getConstructor(AtlasGraph.class, AtlasTypeRegistry.class, TaskManagement.class)
+ .newInstance(this.graph, typeRegistry, taskManagement);
} catch (Exception ex) {
LOG.error("Error instantiating default delete handler. Defaulting to: {}", softDeleteHandler.getClass().getName(), ex);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 7b2e2d3..20d5e6f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.store.graph.v1;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
@@ -26,6 +27,8 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.graph.AtlasEdgeLabel;
@@ -37,6 +40,8 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.DeleteType;
+import org.apache.atlas.repository.store.graph.v2.tasks.ClassificationTask;
+import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
@@ -59,23 +64,31 @@ import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_ADD;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
public abstract class DeleteHandlerV1 {
public static final Logger LOG = LoggerFactory.getLogger(DeleteHandlerV1.class);
- protected final GraphHelper graphHelper;
- private final AtlasTypeRegistry typeRegistry;
- private final EntityGraphRetriever entityRetriever;
- private final boolean shouldUpdateInverseReferences;
- private final boolean softDelete;
+ private static final boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
- public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete) {
+ protected final GraphHelper graphHelper;
+ private final AtlasTypeRegistry typeRegistry;
+ private final EntityGraphRetriever entityRetriever;
+ private final boolean shouldUpdateInverseReferences;
+ private final boolean softDelete;
+ private final TaskManagement taskManagement;
+
+
+ public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete, TaskManagement taskManagement) {
this.typeRegistry = typeRegistry;
this.graphHelper = new GraphHelper(graph);
this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.shouldUpdateInverseReferences = shouldUpdateInverseReference;
this.softDelete = softDelete;
+ this.taskManagement = taskManagement;
}
/**
@@ -383,16 +396,24 @@ public abstract class DeleteHandlerV1 {
}
private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
- final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
- final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, getRelationshipGuid(edge)) : null;
+ final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
+ String relationshipGuid = getRelationshipGuid(edge);
- if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), propagatedEntityVertices.size());
+ if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
+ for (AtlasVertex classificationVertex : classificationVertices) {
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, toVertex, classificationVertex.getIdForDisplay(), relationshipGuid);
}
+ } else {
+ final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, relationshipGuid) : null;
- for (AtlasVertex classificationVertex : classificationVertices) {
- addTagPropagation(classificationVertex, propagatedEntityVertices);
+ if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), propagatedEntityVertices.size());
+ }
+
+ for (AtlasVertex classificationVertex : classificationVertices) {
+ addTagPropagation(classificationVertex, propagatedEntityVertices);
+ }
}
}
}
@@ -561,75 +582,6 @@ public abstract class DeleteHandlerV1 {
}
}
- public void removeTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException {
- if (edge == null) {
- return;
- }
-
- AtlasVertex outVertex = edge.getOutVertex();
- AtlasVertex inVertex = edge.getInVertex();
-
- if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
- removeTagPropagation(outVertex, inVertex, edge);
- }
-
- if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
- removeTagPropagation(inVertex, outVertex, edge);
- }
- }
-
- private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
- final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
- final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, getRelationshipGuid(edge)) : null;
-
- if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing {} propagated tags: for {} from {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size());
- }
-
- for (AtlasVertex classificationVertex : classificationVertices) {
- String classificationName = getTypeName(classificationVertex);
- AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
- List<AtlasVertex> referrals = entityRetriever.getIncludedImpactedVerticesV2(associatedEntityVertex, getRelationshipGuid(edge));
-
- for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
- if (referrals.contains(impactedEntityVertex)) {
- if (LOG.isDebugEnabled()) {
- if (org.apache.commons.lang3.StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) {
- LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]",
- getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName, getTypeName(associatedEntityVertex));
- } else {
- LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path",
- getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName);
- }
- }
-
- continue;
- }
-
- // remove propagated classification edge and classificationName from propagatedTraitNames vertex property
- AtlasEdge propagatedEdge = getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex);
-
- if (propagatedEdge != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]",
- getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
- }
-
- graphHelper.removeEdge(propagatedEdge);
-
- removeFromPropagatedClassificationNames(impactedEntityVertex, classificationName);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist",
- getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
- }
- }
- }
- }
- }
- }
-
public void deletePropagatedClassification(AtlasVertex entityVertex, String classificationName, String associatedEntityGuid) throws AtlasBaseException {
AtlasEdge propagatedEdge = getPropagatedClassificationEdge(entityVertex, classificationName, associatedEntityGuid);
@@ -1059,7 +1011,11 @@ public abstract class DeleteHandlerV1 {
boolean removePropagations = getRemovePropagations(classificationVertex);
if (isClassificationEdge && removePropagations) {
- removeTagPropagation(classificationVertex);
+ if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, instanceVertex, classificationVertex.getIdForDisplay(), null);
+ } else {
+ removeTagPropagation(classificationVertex);
+ }
}
deleteEdgeReference(edge, CLASSIFICATION, false, false, instanceVertex);
@@ -1089,4 +1045,166 @@ public abstract class DeleteHandlerV1 {
return ret;
}
+
+ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
+ PropagateTags oldTagPropagation = getPropagateTags(edge);
+ PropagateTags newTagPropagation = relationship.getPropagateTags();
+
+ if (newTagPropagation != oldTagPropagation) {
+ List<AtlasVertex> currentClassificationVertices = getPropagatableClassifications(edge);
+ Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
+
+ // Update propagation edge
+ AtlasGraphUtilsV2.setEncodedProperty(edge, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
+
+ List<AtlasVertex> updatedClassificationVertices = getPropagatableClassifications(edge);
+ List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
+ Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
+
+ // compute add/remove propagations list
+ Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>();
+ Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
+
+ if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) {
+ addPropagationsMap.putAll(updatedClassificationsMap);
+
+ } else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
+ removePropagationsMap.putAll(currentClassificationsMap);
+
+ } else {
+ for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
+ List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
+ List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
+ List<AtlasVertex> entitiesAdded = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities);
+ List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
+
+ if (CollectionUtils.isNotEmpty(entitiesAdded)) {
+ addPropagationsMap.put(classificationVertex, entitiesAdded);
+ }
+
+ if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
+ removePropagationsMap.put(classificationVertex, entitiesRemoved);
+ }
+ }
+ }
+
+ for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
+ List<AtlasVertex> entitiesToAddPropagation = addPropagationsMap.get(classificationVertex);
+
+ addTagPropagation(classificationVertex, entitiesToAddPropagation);
+ }
+
+ for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
+ List<AtlasVertex> entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex);
+
+ removeTagPropagation(classificationVertex, entitiesToRemovePropagation);
+ }
+ } else {
+ // update blocked propagated classifications only if there is no change is tag propagation (don't update both)
+ handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
+ }
+ }
+
+ public void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedClassifications) throws AtlasBaseException {
+ if (blockedClassifications != null) {
+ List<AtlasVertex> propagatableClassifications = getPropagatableClassifications(edge);
+ List<String> currBlockedClassificationIds = getBlockedClassificationIds(edge);
+ List<AtlasVertex> currBlockedClassifications = getVerticesForIds(propagatableClassifications, currBlockedClassificationIds);
+ List<AtlasVertex> classificationsToBlock = new ArrayList<>();
+ List<String> classificationIdsToBlock = new ArrayList<>();
+
+ for (AtlasClassification blockedClassification : blockedClassifications) {
+ AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatableClassifications, blockedClassification);
+
+ if (classificationVertex != null) {
+ classificationsToBlock.add(classificationVertex);
+ classificationIdsToBlock.add(classificationVertex.getIdForDisplay());
+ }
+ }
+
+ setBlockedClassificationIds(edge, classificationIdsToBlock);
+
+ List<AtlasVertex> propagationChangedClassifications = (List<AtlasVertex>) CollectionUtils.disjunction(classificationsToBlock, currBlockedClassifications);
+
+ for (AtlasVertex classificationVertex : propagationChangedClassifications) {
+ List<AtlasVertex> propagationsToRemove = new ArrayList<>();
+ List<AtlasVertex> propagationsToAdd = new ArrayList<>();
+
+ entityRetriever.evaluateClassificationPropagation(classificationVertex, propagationsToAdd, propagationsToRemove);
+
+ if (CollectionUtils.isNotEmpty(propagationsToAdd)) {
+ addTagPropagation(classificationVertex, propagationsToAdd);
+ }
+
+ if (CollectionUtils.isNotEmpty(propagationsToRemove)) {
+ removeTagPropagation(classificationVertex, propagationsToRemove);
+ }
+ }
+ }
+ }
+
+ private List<AtlasVertex> getVerticesForIds(List<AtlasVertex> vertices, List<String> vertexIds) {
+ List<AtlasVertex> ret = new ArrayList<>();
+
+ if (CollectionUtils.isNotEmpty(vertexIds)) {
+ for (AtlasVertex vertex : vertices) {
+ String vertexId = vertex.getIdForDisplay();
+
+ if (vertexIds.contains(vertexId)) {
+ ret.add(vertex);
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ // propagated classifications should contain blocked propagated classification
+ private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) {
+ AtlasVertex ret = null;
+
+ for (AtlasVertex vertex : classificationVertices) {
+ String classificationName = getClassificationName(vertex);
+ String entityGuid = getClassificationEntityGuid(vertex);
+
+ if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
+ ret = vertex;
+ break;
+ }
+ }
+
+ return ret;
+ }
+
+ private void setBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
+ if (edge != null) {
+ if (classificationIds.isEmpty()) {
+ edge.removeProperty(org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
+ } else {
+ edge.setListProperty(org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
+ }
+ }
+ }
+
+ public void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid) {
+ String currentUser = RequestContext.getCurrentUser();
+ String entityGuid = GraphHelper.getGuid(entityVertex);
+ Map<String, Object> taskParams = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid);
+ AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams);
+
+ AtlasGraphUtilsV2.addEncodedProperty(entityVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid());
+
+ RequestContext.get().queueTask(task);
+ }
+
+ public void createAndQueueTask(String taskType, AtlasEdge relationshipEdge, AtlasRelationship relationship) {
+ String currentUser = RequestContext.getCurrentUser();
+ String relationshipEdgeId = relationshipEdge.getIdForDisplay();
+ Map<String, Object> taskParams = ClassificationTask.toParameters(relationshipEdgeId, relationship);
+ AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams);
+
+ AtlasGraphUtilsV2.addEncodedProperty(relationshipEdge, PENDING_TASKS_PROPERTY_KEY, task.getGuid());
+
+ RequestContext.get().queueTask(task);
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java
index c241e23..eebb40a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java
@@ -24,6 +24,7 @@ import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.springframework.stereotype.Component;
@@ -34,8 +35,8 @@ import javax.inject.Inject;
public class HardDeleteHandlerV1 extends DeleteHandlerV1 {
@Inject
- public HardDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
- super(graph, typeRegistry, true, false);
+ public HardDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, TaskManagement taskManagement) {
+ super(graph, typeRegistry, true, false, taskManagement);
}
@Override
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java
index bede9c3..1fc7cb7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java
@@ -26,6 +26,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import javax.inject.Inject;
@@ -38,8 +39,8 @@ import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
public class SoftDeleteHandlerV1 extends DeleteHandlerV1 {
@Inject
- public SoftDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
- super(graph, typeRegistry, false, true);
+ public SoftDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, TaskManagement taskManagement) {
+ super(graph, typeRegistry, false, true, taskManagement);
}
@Override
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index b2be41c..8d4fdf3 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -192,6 +192,22 @@ public class AtlasGraphUtilsV2 {
return addProperty(vertex, propertyName, value, true);
}
+ public static AtlasEdge addEncodedProperty(AtlasEdge edge, String propertyName, String value) {
+ List<String> listPropertyValues = edge.getListProperty(propertyName);
+
+ if (listPropertyValues == null) {
+ listPropertyValues = new ArrayList<>();
+ }
+
+ listPropertyValues.add(value);
+
+ edge.removeProperty(propertyName);
+
+ edge.setListProperty(propertyName, listPropertyValues);
+
+ return edge;
+ }
+
public static AtlasVertex addProperty(AtlasVertex vertex, String propertyName, Object value, boolean isEncoded) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> addProperty({}, {}, {})", toString(vertex), propertyName, value);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
index 8d74489..a0fd71f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -18,6 +18,7 @@
*/
package org.apache.atlas.repository.store.graph.v2;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
@@ -26,8 +27,6 @@ import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.authorize.AtlasRelationshipAccessRequest;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
-import org.apache.atlas.model.instance.AtlasClassification;
-import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
@@ -51,7 +50,6 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasPerfMetrics;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -61,7 +59,6 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -69,10 +66,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import java.util.function.Function;
import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED;
-import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
@@ -84,13 +79,9 @@ import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
-import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
-import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
-import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;
@Component
public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
@@ -98,7 +89,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
private final AtlasGraph graph;
- private boolean notificationsEnabled = NOTIFICATION_RELATIONSHIPS_ENABLED.getBoolean();
+ private boolean notificationsEnabled = NOTIFICATION_RELATIONSHIPS_ENABLED.getBoolean();
+ private boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
@@ -401,7 +393,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
AtlasGraphUtilsV2.setEncodedProperty(ret, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
// blocked propagated classifications
- handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
+ deleteDelegate.getHandler().handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
// propagate tags
deleteDelegate.getHandler().addTagPropagation(ret, tagPropagation);
@@ -459,140 +451,11 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
}
- private void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedClassifications) throws AtlasBaseException {
- if (blockedClassifications != null) {
- List<AtlasVertex> propagatableClassifications = getPropagatableClassifications(edge);
- List<String> currBlockedClassificationIds = getBlockedClassificationIds(edge);
- List<AtlasVertex> currBlockedClassifications = getVerticesForIds(propagatableClassifications, currBlockedClassificationIds);
- List<AtlasVertex> classificationsToBlock = new ArrayList<>();
- List<String> classificationIdsToBlock = new ArrayList<>();
-
- for (AtlasClassification blockedClassification : blockedClassifications) {
- AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatableClassifications, blockedClassification);
-
- if (classificationVertex != null) {
- classificationsToBlock.add(classificationVertex);
- classificationIdsToBlock.add(classificationVertex.getIdForDisplay());
- }
- }
-
- setBlockedClassificationIds(edge, classificationIdsToBlock);
-
- List<AtlasVertex> propagationChangedClassifications = (List<AtlasVertex>) CollectionUtils.disjunction(classificationsToBlock, currBlockedClassifications);
-
- for (AtlasVertex classificationVertex : propagationChangedClassifications) {
- List<AtlasVertex> propagationsToRemove = new ArrayList<>();
- List<AtlasVertex> propagationsToAdd = new ArrayList<>();
-
- entityRetriever.evaluateClassificationPropagation(classificationVertex, propagationsToAdd, propagationsToRemove);
-
- if (CollectionUtils.isNotEmpty(propagationsToAdd)) {
- deleteDelegate.getHandler().addTagPropagation(classificationVertex, propagationsToAdd);
- }
-
- if (CollectionUtils.isNotEmpty(propagationsToRemove)) {
- deleteDelegate.getHandler().removeTagPropagation(classificationVertex, propagationsToRemove);
- }
- }
- }
- }
-
- private List<AtlasVertex> getVerticesForIds(List<AtlasVertex> vertices, List<String> vertexIds) {
- List<AtlasVertex> ret = new ArrayList<>();
-
- if (CollectionUtils.isNotEmpty(vertexIds)) {
- for (AtlasVertex vertex : vertices) {
- String vertexId = vertex.getIdForDisplay();
-
- if (vertexIds.contains(vertexId)) {
- ret.add(vertex);
- }
- }
- }
-
- return ret;
- }
-
- // propagated classifications should contain blocked propagated classification
- private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) {
- AtlasVertex ret = null;
-
- for (AtlasVertex vertex : classificationVertices) {
- String classificationName = getClassificationName(vertex);
- String entityGuid = getClassificationEntityGuid(vertex);
-
- if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
- ret = vertex;
- break;
- }
- }
-
- return ret;
- }
-
- private void setBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
- if (edge != null) {
- if (classificationIds.isEmpty()) {
- edge.removeProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
- } else {
- edge.setListProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
- }
- }
- }
-
- private void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
- PropagateTags oldTagPropagation = getPropagateTags(edge);
- PropagateTags newTagPropagation = relationship.getPropagateTags();
-
- if (newTagPropagation != oldTagPropagation) {
- List<AtlasVertex> currentClassificationVertices = getPropagatableClassifications(edge);
- Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
-
- // Update propagation edge
- AtlasGraphUtilsV2.setEncodedProperty(edge, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
-
- List<AtlasVertex> updatedClassificationVertices = getPropagatableClassifications(edge);
- List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
- Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
-
- // compute add/remove propagations list
- Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>();
- Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
-
- if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) {
- addPropagationsMap.putAll(updatedClassificationsMap);
-
- } else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
- removePropagationsMap.putAll(currentClassificationsMap);
-
- } else {
- for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
- List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
- List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
-
- List<AtlasVertex> entitiesAdded = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities);
- List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
-
- if (CollectionUtils.isNotEmpty(entitiesAdded)) {
- addPropagationsMap.put(classificationVertex, entitiesAdded);
- }
-
- if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
- removePropagationsMap.put(classificationVertex, entitiesRemoved);
- }
- }
- }
-
- for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
- deleteDelegate.getHandler().addTagPropagation(classificationVertex, addPropagationsMap.get(classificationVertex));
- }
-
- for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
- deleteDelegate.getHandler().removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex));
- }
+ private void updateTagPropagations(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException {
+ if (DEFERRED_ACTION_ENABLED) {
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE, relationshipEdge, relationship);
} else {
- // update blocked propagated classifications only if there is no change is tag propagation (don't update both)
- handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
+ deleteDelegate.getHandler().updateTagPropagations(relationshipEdge, relationship);
}
}
@@ -934,4 +797,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
entityChangeNotifier.notifyRelationshipMutation(ret, relationshipUpdate);
}
}
+
+ private void createAndQueueTask(String taskType, AtlasEdge relationshipEdge, AtlasRelationship relationship) {
+ deleteDelegate.getHandler().createAndQueueTask(taskType, relationshipEdge, relationship);
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 2cfcc0b..d8ef32b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -18,10 +18,12 @@
package org.apache.atlas.repository.store.graph.v2;
+ import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
+import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TimeBoundary;
import org.apache.atlas.model.TypeCategory;
@@ -39,8 +41,8 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
-import org.apache.atlas.repository.graph.IFullTextMapper;
import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graph.IFullTextMapper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -48,12 +50,13 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
-import org.apache.atlas.type.AtlasArrayType;
+ import org.apache.atlas.tasks.TaskManagement;
+ import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes;
+import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasMapType;
-import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
@@ -73,7 +76,17 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -89,9 +102,9 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PA
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
import static org.apache.atlas.repository.Constants.*;
-import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
+import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
import static org.apache.atlas.repository.graph.GraphHelper.getDelimitedClassificationNames;
import static org.apache.atlas.repository.graph.GraphHelper.getLabels;
import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
@@ -107,6 +120,8 @@ import static org.apache.atlas.repository.graph.GraphHelper.string;
import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_ADD;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
@@ -128,6 +143,7 @@ public class EntityGraphMapper {
private static final boolean ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES = AtlasConfiguration.ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES.getBoolean();
private static final boolean CLASSIFICATION_PROPAGATION_DEFAULT = AtlasConfiguration.CLASSIFICATION_PROPAGATION_DEFAULT.getBoolean();
+ private boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
private final GraphHelper graphHelper;
private final AtlasGraph graph;
@@ -137,12 +153,14 @@ public class EntityGraphMapper {
private final IAtlasEntityChangeNotifier entityChangeNotifier;
private final AtlasInstanceConverter instanceConverter;
private final EntityGraphRetriever entityRetriever;
- private final IFullTextMapper fullTextMapperV2;
+ private final IFullTextMapper fullTextMapperV2;
+ private final TaskManagement taskManagement;
@Inject
public EntityGraphMapper(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, AtlasGraph graph,
AtlasRelationshipStore relationshipStore, IAtlasEntityChangeNotifier entityChangeNotifier,
- AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2) {
+ AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2,
+ TaskManagement taskManagement) {
this.graphHelper = new GraphHelper(graph);
this.deleteDelegate = deleteDelegate;
this.typeRegistry = typeRegistry;
@@ -152,6 +170,12 @@ public class EntityGraphMapper {
this.instanceConverter = instanceConverter;
this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.fullTextMapperV2 = fullTextMapperV2;
+ this.taskManagement = taskManagement;
+ }
+
+ @VisibleForTesting
+ public void setTasksUseFlag(boolean value) {
+ DEFERRED_ACTION_ENABLED = value;
}
public AtlasVertex createVertex(AtlasEntity entity) throws AtlasBaseException {
@@ -1943,6 +1967,12 @@ public class EntityGraphMapper {
LOG.debug("created vertex {} for trait {}", string(classificationVertex), classificationName);
}
+ if (propagateTags && taskManagement != null && DEFERRED_ACTION_ENABLED) {
+ propagateTags = false;
+
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, entityVertex, classificationVertex.getIdForDisplay());
+ }
+
// add the attributes for the trait instance
mapClassification(EntityOperation.CREATE, context, classification, entityType, entityVertex, classificationVertex);
updateModificationMetadata(entityVertex);
@@ -2000,6 +2030,58 @@ public class EntityGraphMapper {
}
}
+ @GraphTransaction
+ public List<String> propagateClassification(String entityGuid, String classificationVertexId, String relationshipGuid) throws AtlasBaseException {
+ try {
+ if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
+ LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);
+
+ return null;
+ }
+
+ AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
+
+ if (entityVertex == null) {
+ LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
+
+ return null;
+ }
+
+ AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
+
+ if (classificationVertex == null) {
+ LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
+
+ return null;
+ }
+
+ List<AtlasVertex> impactedVertices = entityRetriever.getImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId);
+
+ if (CollectionUtils.isEmpty(impactedVertices)) {
+ LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId);
+
+ return null;
+ }
+
+ AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
+ List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, impactedVertices);
+
+ if (CollectionUtils.isEmpty(entitiesPropagatedTo)) {
+ return null;
+ }
+
+ List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entitiesPropagatedTo);
+
+ entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification));
+
+ return propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList());
+ } catch (Exception e) {
+ LOG.error("propagateClassification(entityGuid={}, classificationVertexId={}): error while propagating classification", entityGuid, classificationVertexId, e);
+
+ throw new AtlasBaseException(e);
+ }
+ }
+
public void deleteClassification(String entityGuid, String classificationName, String associatedEntityGuid) throws AtlasBaseException {
if (StringUtils.isEmpty(associatedEntityGuid) || associatedEntityGuid.equals(entityGuid)) {
deleteClassification(entityGuid, classificationName);
@@ -2058,10 +2140,16 @@ public class EntityGraphMapper {
final List<AtlasVertex> entityVertices;
if (isPropagationEnabled(classificationVertex)) {
- entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
+ if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, entityVertex, classificationVertex.getIdForDisplay());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Number of propagations to delete -> {}", entityVertices.size());
+ entityVertices = new ArrayList<>();
+ } else {
+ entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of propagations to delete -> {}", entityVertices.size());
+ }
}
} else {
entityVertices = new ArrayList<>();
@@ -2267,6 +2355,27 @@ public class EntityGraphMapper {
Boolean currentTagPropagation = currentClassification.isPropagate();
Boolean updatedTagPropagation = classification.isPropagate();
+ /* -----------------------------
+ | Current Tag | Updated Tag |
+ | Propagation | Propagation |
+ |-------------|-------------|
+ | true | true | => no-op
+ |-------------|-------------|
+ | false | false | => no-op
+ |-------------|-------------|
+ | false | true | => Add Tag Propagation (send ADD classification notifications)
+ |-------------|-------------|
+ | true | false | => Remove Tag Propagation (send REMOVE classification notifications)
+ |-------------|-------------| */
+
+ if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
+ String propagationType = updatedTagPropagation ? CLASSIFICATION_PROPAGATION_ADD : CLASSIFICATION_PROPAGATION_DELETE;
+
+ createAndQueueTask(propagationType, entityVertex, classificationVertex.getIdForDisplay());
+
+ updatedTagPropagation = null;
+ }
+
// compute propagatedEntityVertices once and use it for subsequent iterations and notifications
if (updatedTagPropagation != null && currentTagPropagation != updatedTagPropagation) {
if (updatedTagPropagation) {
@@ -2340,6 +2449,63 @@ public class EntityGraphMapper {
AtlasPerfTracer.log(perf);
}
+ @GraphTransaction
+ public List<String> updateClassificationsPropagation(String entityGuid, String classificationVertexId, String relationshipGuid) throws AtlasBaseException {
+ try {
+ if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
+ LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);
+ return null;
+ }
+
+ AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
+
+ if (entityVertex == null) {
+ LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
+ return null;
+ }
+
+ AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
+
+ if (classificationVertex == null) {
+ LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
+ return null;
+ }
+
+ List<AtlasVertex> entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertex.getIdForDisplay());
+
+ if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
+ LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no impacted vertices found!", entityGuid, classificationVertexId);
+ return null;
+ }
+
+ List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo);
+
+ if (CollectionUtils.isEmpty(entitiesPropagatedTo)) {
+ LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no propagations added!", entityGuid, classificationVertexId);
+ return null;
+ }
+
+ AtlasClassification updatedClassification = entityRetriever.toAtlasClassification(classificationVertex);
+ List<String> ret = new ArrayList<>();
+
+ for (AtlasVertex vertex : entitiesToPropagateTo) {
+ AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
+
+ ret.add(entity.getGuid());
+
+ if (isActive(entity)) {
+ vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+
+ entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(updatedClassification));
+ }
+ }
+
+ return ret;
+ } catch (Exception ex) {
+ throw new AtlasBaseException(ex);
+ }
+ }
+
private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification,
AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
throws AtlasBaseException {
@@ -2387,6 +2553,50 @@ public class EntityGraphMapper {
}
}
+ @GraphTransaction
+ public List<String> deleteClassificationPropagation(String classificationVertexId) throws AtlasBaseException {
+ try {
+ if (StringUtils.isEmpty(classificationVertexId)) {
+ LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex id is empty", classificationVertexId);
+
+ return null;
+ }
+
+ AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
+
+ if (classificationVertex == null) {
+ LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex not found", classificationVertexId);
+
+ return null;
+ }
+
+ List<AtlasVertex> entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
+
+ if (CollectionUtils.isEmpty(entityVertices)) {
+
+ return null;
+ }
+
+ AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
+ List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entityVertices);
+
+ entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification));
+
+ return propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new AtlasBaseException(e);
+ }
+ }
+
+ @GraphTransaction
+ public void updateTagPropagations(String relationshipEdgeId, AtlasRelationship relationship) throws AtlasBaseException {
+ AtlasEdge relationshipEdge = graph.getEdge(relationshipEdgeId);
+
+ deleteDelegate.getHandler().updateTagPropagations(relationshipEdge, relationship);
+
+ entityChangeNotifier.notifyPropagatedEntities();
+ }
+
private void validateClassificationExists(List<String> existingClassifications, List<String> suppliedClassifications) throws AtlasBaseException {
Set<String> existingNames = new HashSet<>(existingClassifications);
for (String classificationName : suppliedClassifications) {
@@ -2608,4 +2818,8 @@ public class EntityGraphMapper {
attributes.put(bmAttribute.getName(), attrValue);
}
-}
+
+ private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId) {
+ deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null);
+ }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 8208d11..b790023 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -68,6 +68,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -79,6 +80,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
@@ -116,6 +118,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
@Component
public class EntityGraphRetriever {
@@ -501,7 +504,7 @@ public class EntityGraphRetriever {
public List<AtlasVertex> getImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude) {
List<AtlasVertex> ret = new ArrayList<>();
- traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, new HashSet<>(), ret);
+ traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, ret);
return ret;
}
@@ -509,7 +512,7 @@ public class EntityGraphRetriever {
public List<AtlasVertex> getIncludedImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude) {
List<AtlasVertex> ret = new ArrayList<>(Arrays.asList(entityVertex));
- traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, new HashSet<>(), ret);
+ traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, ret);
return ret;
}
@@ -517,20 +520,37 @@ public class EntityGraphRetriever {
public List<AtlasVertex> getImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude, String classificationId) {
List<AtlasVertex> ret = new ArrayList<>();
- traverseImpactedVertices(entityVertex, relationshipGuidToExclude, classificationId, new HashSet<>(), ret);
+ traverseImpactedVertices(entityVertex, relationshipGuidToExclude, classificationId, ret);
return ret;
}
- private void traverseImpactedVertices(AtlasVertex entityVertex, String relationshipGuidToExclude, String classificationId, Set<String> visitedVertices, List<AtlasVertex> result) {
- visitedVertices.add(entityVertex.getIdForDisplay());
+ private void traverseImpactedVertices(final AtlasVertex entityVertexStart, final String relationshipGuidToExclude,
+ final String classificationId, final List<AtlasVertex> result) {
+ Set<String> visitedVertices = new HashSet<>();
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(getTypeName(entityVertex));
- String[] tagPropagationEdges = entityType != null ? entityType.getTagPropagationEdgesArray() : null;
+ Queue<AtlasVertex> queue = new ArrayDeque<AtlasVertex>() {{ add(entityVertexStart); }};
- if (tagPropagationEdges != null) {
- Iterable<AtlasEdge> propagationEdges = entityVertex.getEdges(AtlasEdgeDirection.BOTH, tagPropagationEdges);
+ while (!queue.isEmpty()) {
+ AtlasVertex entityVertex = queue.poll();
+ String entityVertexId = entityVertex.getIdForDisplay();
+
+ if (visitedVertices.contains(entityVertexId)) {
+ LOG.info("Already visited: {}", entityVertexId);
+
+ continue;
+ }
+
+ visitedVertices.add(entityVertexId);
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(getTypeName(entityVertex));
+ String[] tagPropagationEdges = entityType != null ? entityType.getTagPropagationEdgesArray() : null;
+
+ if (tagPropagationEdges == null) {
+ continue;
+ }
+
+ Iterable<AtlasEdge> propagationEdges = entityVertex.getEdges(AtlasEdgeDirection.BOTH, tagPropagationEdges);
for (AtlasEdge propagationEdge : propagationEdges) {
if (getEdgeStatus(propagationEdge) != ACTIVE) {
continue;
@@ -540,7 +560,7 @@ public class EntityGraphRetriever {
if (tagPropagation == null || tagPropagation == NONE) {
continue;
- } else if (tagPropagation == TWO_TO_ONE) {
+ } else if (tagPropagation == TWO_TO_ONE) {
if (isOutVertex(entityVertex, propagationEdge)) {
continue;
}
@@ -558,18 +578,18 @@ public class EntityGraphRetriever {
if (classificationId != null) {
List<String> blockedClassificationIds = getBlockedClassificationIds(propagationEdge);
-
if (CollectionUtils.isNotEmpty(blockedClassificationIds) && blockedClassificationIds.contains(classificationId)) {
continue;
}
}
- AtlasVertex adjacentVertex = getOtherVertex(propagationEdge, entityVertex);
+ AtlasVertex adjacentVertex = getOtherVertex(propagationEdge, entityVertex);
+ String adjacentVertexIdForDisplay = adjacentVertex.getIdForDisplay();
- if (!visitedVertices.contains(adjacentVertex.getIdForDisplay())) {
+ if (!visitedVertices.contains(adjacentVertexIdForDisplay)) {
result.add(adjacentVertex);
- traverseImpactedVertices(adjacentVertex, relationshipGuidToExclude, classificationId, visitedVertices, result);
+ queue.add(adjacentVertex);
}
}
}
@@ -768,25 +788,32 @@ public class EntityGraphRetriever {
LOG.debug("Mapping system attributes for type {}", entity.getTypeName());
}
- entity.setGuid(getGuid(entityVertex));
- entity.setTypeName(getTypeName(entityVertex));
- entity.setStatus(GraphHelper.getStatus(entityVertex));
- entity.setVersion(GraphHelper.getVersion(entityVertex));
+ try {
+ if (entityVertex != null) {
+ entity.setGuid(getGuid(entityVertex));
+ entity.setTypeName(getTypeName(entityVertex));
+ entity.setStatus(GraphHelper.getStatus(entityVertex));
+ entity.setVersion(GraphHelper.getVersion(entityVertex));
- entity.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex));
- entity.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex));
+ entity.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex));
+ entity.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex));
- entity.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex)));
- entity.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex)));
+ entity.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex)));
+ entity.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex)));
- entity.setHomeId(GraphHelper.getHomeId(entityVertex));
+ entity.setHomeId(GraphHelper.getHomeId(entityVertex));
- entity.setIsProxy(GraphHelper.isProxy(entityVertex));
- entity.setIsIncomplete(isEntityIncomplete(entityVertex));
+ entity.setIsProxy(GraphHelper.isProxy(entityVertex));
+ entity.setIsIncomplete(isEntityIncomplete(entityVertex));
- entity.setProvenanceType(GraphHelper.getProvenanceType(entityVertex));
- entity.setCustomAttributes(getCustomAttributes(entityVertex));
- entity.setLabels(getLabels(entityVertex));
+ entity.setProvenanceType(GraphHelper.getProvenanceType(entityVertex));
+ entity.setCustomAttributes(getCustomAttributes(entityVertex));
+ entity.setLabels(getLabels(entityVertex));
+ entity.setPendingTasks(getPendingTasks(entityVertex));
+ }
+ } catch (Throwable t) {
+ LOG.warn("Got exception while mapping system attributes for type {} : ", entity.getTypeName(), t);
+ }
return entity;
}
@@ -1660,4 +1687,14 @@ public class EntityGraphRetriever {
relationship.setAttribute(attribute.getName(), attrValue);
}
}
+
+ private Set<String> getPendingTasks(AtlasVertex entityVertex) {
+ Collection<String> ret = entityVertex.getPropertyValues(PENDING_TASKS_PROPERTY_KEY, String.class);
+
+ if (CollectionUtils.isEmpty(ret)) {
+ return null;
+ }
+
+ return new HashSet<>(ret);
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
index fe8699d..d6f23d6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -123,12 +123,12 @@ public class MigrationImport extends ImportStrategy {
private AtlasEntityStoreV2 createEntityStore(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
FullTextMapperV2Nop fullTextMapperV2 = new FullTextMapperV2Nop();
IAtlasEntityChangeNotifier entityChangeNotifier = new EntityChangeNotifierNop();
- DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(graph, typeRegistry);
+ DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(graph, typeRegistry, null);
AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(graph, typeRegistry, formatConverters);
AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(graph, typeRegistry, deleteDelegate, entityChangeNotifier);
- EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, graph, relationshipStore, entityChangeNotifier, instanceConverter, fullTextMapperV2);
+ EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, graph, relationshipStore, entityChangeNotifier, instanceConverter, fullTextMapperV2, null);
return new AtlasEntityStoreV2(graph, deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
new file mode 100644
index 0000000..6244b2d
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.tasks.TaskFactory;
+import org.apache.atlas.tasks.TaskManagement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class ClassificationPropagateTaskFactory implements TaskFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(ClassificationPropagateTaskFactory.class);
+
+ public static final String CLASSIFICATION_PROPAGATION_ADD = "CLASSIFICATION_PROPAGATION_ADD";
+ public static final String CLASSIFICATION_PROPAGATION_UPDATE = "CLASSIFICATION_PROPAGATION_UPDATE";
+ public static final String CLASSIFICATION_PROPAGATION_DELETE = "CLASSIFICATION_PROPAGATION_DELETE";
+ public static final String CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE = "CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE";
+
+ private static final List<String> supportedTypes = new ArrayList<String>() {{
+ add(CLASSIFICATION_PROPAGATION_ADD);
+ add(CLASSIFICATION_PROPAGATION_UPDATE);
+ add(CLASSIFICATION_PROPAGATION_DELETE);
+ add(CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE);
+ }};
+
+ private final AtlasGraph graph;
+ private final EntityGraphMapper entityGraphMapper;
+ private final DeleteHandlerDelegate deleteDelegate;
+ private final AtlasRelationshipStore relationshipStore;
+
+ @Inject
+ public ClassificationPropagateTaskFactory(TaskManagement taskManagement, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+ this.graph = graph;
+ this.entityGraphMapper = entityGraphMapper;
+ this.deleteDelegate = deleteDelegate;
+ this.relationshipStore = relationshipStore;
+
+ taskManagement.addFactory(this);
+ }
+
+ public org.apache.atlas.tasks.AbstractTask create(AtlasTask task) {
+ String taskType = task.getType();
+ String taskGuid = task.getGuid();
+
+ switch (taskType) {
+ case CLASSIFICATION_PROPAGATION_ADD:
+ return new ClassificationPropagationTasks.Add(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+
+ case CLASSIFICATION_PROPAGATION_UPDATE:
+ return new ClassificationPropagationTasks.Update(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+
+ case CLASSIFICATION_PROPAGATION_DELETE:
+ return new ClassificationPropagationTasks.Delete(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+
+ case CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE:
+ return new ClassificationPropagationTasks.UpdateRelationship(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+
+ default:
+ LOG.warn("Type: {} - {} not found!. The task will be ignored.", taskType, taskGuid);
+ return null;
+ }
+ }
+
+ @Override
+ public List<String> getSupportedTypes() {
+ return this.supportedTypes;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
new file mode 100644
index 0000000..4fda34a
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.type.AtlasType;
+
+import java.util.Map;
+
+public class ClassificationPropagationTasks {
+ public static class Add extends ClassificationTask {
+ public Add(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+ super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+ }
+
+ @Override
+ protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+ String entityGuid = (String) parameters.get(PARAM_ENTITY_GUID);
+ String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
+ String relationshipGuid = (String) parameters.get(PARAM_RELATIONSHIP_GUID);
+
+ entityGraphMapper.propagateClassification(entityGuid, classificationVertexId, relationshipGuid);
+ }
+ }
+
+ public static class Update extends ClassificationTask {
+ public Update(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+ super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+ }
+
+ @Override
+ protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+ String entityGuid = (String) parameters.get(PARAM_ENTITY_GUID);
+ String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
+ String relationshipGuid = (String) parameters.get(PARAM_RELATIONSHIP_GUID);
+
+ entityGraphMapper.updateClassificationsPropagation(entityGuid, classificationVertexId, relationshipGuid);
+ }
+ }
+
+ public static class Delete extends ClassificationTask {
+ public Delete(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+ super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+ }
+
+ @Override
+ protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+ String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
+
+ entityGraphMapper.deleteClassificationPropagation(classificationVertexId);
+ }
+ }
+
+ public static class UpdateRelationship extends ClassificationTask {
+ public UpdateRelationship(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+ super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+ }
+
+ @Override
+ protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+ String relationshipEdgeId = (String) parameters.get(PARAM_RELATIONSHIP_EDGE_ID);
+ AtlasRelationship relationship = AtlasType.fromJson((String) parameters.get(PARAM_RELATIONSHIP_OBJECT), AtlasRelationship.class);
+
+ entityGraphMapper.updateTagPropagations(relationshipEdgeId, relationship);
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
new file mode 100644
index 0000000..369db08
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
+import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
+
+public abstract class ClassificationTask extends AbstractTask {
+ private static final Logger LOG = LoggerFactory.getLogger(ClassificationTask.class);
+
+ protected static final String PARAM_ENTITY_GUID = "entityGuid";
+ protected static final String PARAM_CLASSIFICATION_VERTEX_ID = "classificationVertexId";
+ protected static final String PARAM_RELATIONSHIP_GUID = "relationshipGuid";
+ protected static final String PARAM_RELATIONSHIP_OBJECT = "relationshipObject";
+ protected static final String PARAM_RELATIONSHIP_EDGE_ID = "relationshipEdgeId";
+
+ protected final AtlasGraph graph;
+ protected final EntityGraphMapper entityGraphMapper;
+ protected final DeleteHandlerDelegate deleteDelegate;
+ protected final AtlasRelationshipStore relationshipStore;
+
+ public ClassificationTask(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+ super(task);
+
+ this.graph = graph;
+ this.entityGraphMapper = entityGraphMapper;
+ this.deleteDelegate = deleteDelegate;
+ this.relationshipStore = relationshipStore;
+ }
+
+ @Override
+ public AtlasTask.Status perform() throws Exception {
+ Map<String, Object> params = getTaskDef().getParameters();
+
+ if (MapUtils.isEmpty(params)) {
+ LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid());
+
+ return FAILED;
+ }
+
+ String userName = getTaskDef().getCreatedBy();
+
+ if (StringUtils.isEmpty(userName)) {
+ LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid());
+
+ return FAILED;
+ }
+
+ RequestContext.get().setUser(userName, null);
+
+ try {
+ run(params);
+
+ setStatus(COMPLETE);
+ } catch (Exception e) {
+ LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
+
+ setStatus(FAILED);
+
+ throw e;
+ } finally {
+ graph.commit();
+ }
+
+ return getStatus();
+ }
+
+ public static Map<String, Object> toParameters(String entityGuid, String classificationVertexId, String relationshipGuid) {
+ return new HashMap<String, Object>() {{
+ put(PARAM_ENTITY_GUID, entityGuid);
+ put(PARAM_CLASSIFICATION_VERTEX_ID, classificationVertexId);
+ put(PARAM_RELATIONSHIP_GUID, relationshipGuid);
+ }};
+ }
+
+ public static Map<String, Object> toParameters(String relationshipEdgeId, AtlasRelationship relationship) {
+ return new HashMap<String, Object>() {{
+ put(PARAM_RELATIONSHIP_EDGE_ID, relationshipEdgeId);
+ put(PARAM_RELATIONSHIP_OBJECT, AtlasType.toJson(relationship));
+ }};
+ }
+
+ protected void setStatus(AtlasTask.Status status) {
+ super.setStatus(status);
+
+ // remove pending task guid from entity vertex or relationship edge
+ AtlasElement element;
+
+ if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) {
+ element = graph.getEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID));
+
+ } else {
+ element = AtlasGraphUtilsV2.findByGuid((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID));
+ }
+
+ element.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, getTaskGuid());
+ }
+
+ protected abstract void run(Map<String, Object> parameters) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java b/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java
new file mode 100644
index 0000000..33e1d6a
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.model.tasks.AtlasTask;
+
+import static org.apache.atlas.model.tasks.AtlasTask.Status;
+
+public abstract class AbstractTask {
+ private final AtlasTask task;
+
+ public AbstractTask(AtlasTask task) {
+ this.task = task;
+ }
+
+ public void run() throws Exception {
+ try {
+ perform();
+ } catch (Exception exception) {
+ task.setStatusPending();
+
+ task.setErrorMessage(exception.getMessage());
+
+ task.incrementAttemptCount();
+
+ throw exception;
+ } finally {
+ task.end();
+ }
+ }
+
+ protected void setStatus(Status status) {
+ task.setStatus(status);
+ }
+
+ public Status getStatus() {
+ return this.task.getStatus();
+ }
+
+ public String getTaskGuid() {
+ return task.getGuid();
+ }
+
+ public String getTaskType() {
+ return task.getType();
+ }
+
+ protected AtlasTask getTaskDef() {
+ return this.task;
+ }
+
+ public abstract Status perform() throws Exception;
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java
new file mode 100644
index 0000000..81957d8
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TaskExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
+ private static final TaskLogger TASK_LOG = TaskLogger.getLogger();
+ private static final String TASK_NAME_FORMAT = "atlas-task-%d-";
+
+ private final TaskRegistry registry;
+ private final Map<String, TaskFactory> taskTypeFactoryMap;
+ private final TaskManagement.Statistics statistics;
+ private final ExecutorService executorService;
+
+ public TaskExecutor(TaskRegistry registry, Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics) {
+ this.registry = registry;
+ this.taskTypeFactoryMap = taskTypeFactoryMap;
+ this.statistics = statistics;
+ this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(TASK_NAME_FORMAT + Thread.currentThread().getName())
+ .build());
+ }
+
+ public void addAll(List<AtlasTask> tasks) {
+ for (AtlasTask task : tasks) {
+ if (task == null) {
+ continue;
+ }
+
+ TASK_LOG.log(task);
+
+ this.executorService.submit(new TaskConsumer(task, this.registry, this.taskTypeFactoryMap, this.statistics));
+ }
+ }
+
+ @VisibleForTesting
+ void waitUntilDone() throws InterruptedException {
+ Thread.sleep(5000);
+ }
+
+ static class TaskConsumer implements Runnable {
+ private static final int MAX_ATTEMPT_COUNT = 3;
+
+ private final Map<String, TaskFactory> taskTypeFactoryMap;
+ private final TaskRegistry registry;
+ private final TaskManagement.Statistics statistics;
+ private final AtlasTask task;
+
+ public TaskConsumer(AtlasTask task, TaskRegistry registry, Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics) {
+ this.task = task;
+ this.registry = registry;
+ this.taskTypeFactoryMap = taskTypeFactoryMap;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public void run() {
+ AtlasVertex taskVertex = null;
+ int attemptCount;
+
+ try {
+ taskVertex = registry.getVertex(task.getGuid());
+
+ if (task == null || taskVertex == null || task.getStatus() == AtlasTask.Status.COMPLETE) {
+ TASK_LOG.warn("Task not scheduled as it was not found or status was COMPLETE!", task);
+
+ return;
+ }
+
+ statistics.increment(1);
+
+ attemptCount = task.getAttemptCount();
+
+ if (attemptCount >= MAX_ATTEMPT_COUNT) {
+ TASK_LOG.warn("Max retry count for task exceeded! Skipping!", task);
+
+ return;
+ }
+
+ performTask(taskVertex, task);
+ } catch (InterruptedException exception) {
+ if (task != null) {
+ registry.updateStatus(taskVertex, task);
+
+ TASK_LOG.error("{}: {}: Interrupted!", task, exception);
+ } else {
+ LOG.error("Interrupted!", exception);
+ }
+
+ statistics.error();
+ } catch (Exception exception) {
+ if (task != null) {
+ task.updateStatusFromAttemptCount();
+
+ registry.updateStatus(taskVertex, task);
+
+ TASK_LOG.error("Error executing task. Please perform the operation again!", task, exception);
+ } else {
+ LOG.error("Error executing. Please perform the operation again!", exception);
+ }
+
+ statistics.error();
+ } finally {
+ if (task != null) {
+ this.registry.commit();
+
+ TASK_LOG.log(task);
+ }
+ }
+ }
+
+ private void performTask(AtlasVertex taskVertex, AtlasTask task) throws Exception {
+ TaskFactory factory = taskTypeFactoryMap.get(task.getType());
+ if (factory == null) {
+ LOG.error("taskTypeFactoryMap does not contain task of type: {}", task.getType());
+ return;
+ }
+
+ AbstractTask runnableTask = factory.create(task);
+
+ runnableTask.run();
+
+ registry.deleteComplete(taskVertex, task);
+
+ statistics.successPrint();
+ }
+ }
+
+ static class TaskLogger {
+ private static final Logger LOG = LoggerFactory.getLogger("TASKS");
+
+ public static TaskLogger getLogger() {
+ return new TaskLogger();
+ }
+
+ public void info(String message) {
+ LOG.info(message);
+ }
+
+ public void log(AtlasTask task) {
+ LOG.info(AtlasType.toJson(task));
+ }
+
+ public void warn(String message, AtlasTask task) {
+ LOG.warn(message, AtlasType.toJson(task));
+ }
+
+ public void error(String s, AtlasTask task, Exception exception) {
+ LOG.error(s, AtlasType.toJson(task), exception);
+ }
+ }
+}
\ No newline at end of file
diff --git a/webapp/src/main/java/org/apache/atlas/BeanUtil.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java
similarity index 56%
copy from webapp/src/main/java/org/apache/atlas/BeanUtil.java
copy to repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java
index ef5a741..47ce1fc 100644
--- a/webapp/src/main/java/org/apache/atlas/BeanUtil.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java
@@ -15,24 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.atlas.tasks;
+import org.apache.atlas.model.tasks.AtlasTask;
-package org.apache.atlas;
+import java.util.List;
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.stereotype.Component;
+public interface TaskFactory {
+ /**
+ * Creates a concrete task using the task definition.
+ * @param atlasTask
+ * @return
+ */
+ AbstractTask create(AtlasTask atlasTask);
-@Component
-public class BeanUtil implements ApplicationContextAware {
- private static ApplicationContext context;
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- context = applicationContext;
- }
-
- public static <T> T getBean(Class<T> beanClass) {
- return context.getBean(beanClass);
- }
+ List<String> getSupportedTypes();
}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
new file mode 100644
index 0000000..264aa8c
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.service.Service;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Component
+@Order(7)
+public class TaskManagement implements Service, ActiveStateChangeHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagement.class);
+
+ private final ThreadLocal<TaskExecutor> taskExecutorThreadLocal = new ThreadLocal<>();
+ private final Configuration configuration;
+ private final TaskRegistry registry;
+ private final Statistics statistics;
+ private final Map<String, TaskFactory> taskTypeFactoryMap;
+
+ @Inject
+ public TaskManagement(Configuration configuration, TaskRegistry taskRegistry) {
+ this.configuration = configuration;
+ this.registry = taskRegistry;
+ this.statistics = new Statistics();
+ this.taskTypeFactoryMap = new HashMap<>();
+ }
+
+ @VisibleForTesting
+ TaskManagement(Configuration configuration, TaskRegistry taskRegistry, TaskFactory taskFactory) {
+ this.configuration = configuration;
+ this.registry = taskRegistry;
+ this.statistics = new Statistics();
+ this.taskTypeFactoryMap = new HashMap<>();
+
+ createTaskTypeFactoryMap(taskTypeFactoryMap, taskFactory);
+ }
+
+ @Override
+ public void start() throws AtlasException {
+ if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) {
+ startInternal();
+ } else {
+ LOG.info("TaskManagement.start(): deferring patches until instance activation");
+ }
+ }
+
+ @Override
+ public void stop() throws AtlasException {
+ LOG.info("TaskManagement: Stopped!");
+ }
+
+ @Override
+ public void instanceIsActive() throws AtlasException {
+ LOG.info("==> TaskManagement.instanceIsActive()");
+
+ startInternal();
+
+ LOG.info("<== TaskManagement.instanceIsActive()");
+ }
+
+ @Override
+ public void instanceIsPassive() throws AtlasException {
+ LOG.info("TaskManagement.instanceIsPassive(): no action needed");
+ }
+
+ @Override
+ public int getHandlerOrder() {
+ return HandlerOrder.TASK_MANAGEMENT.getOrder();
+ }
+
+ public void addFactory(TaskFactory taskFactory) {
+ createTaskTypeFactoryMap(this.taskTypeFactoryMap, taskFactory);
+ }
+
+ public AtlasTask createTask(String taskType, String createdBy, Map<String, Object> parameters) {
+ return this.registry.createVertex(taskType, createdBy, parameters);
+ }
+
+ public List<AtlasTask> getAll() {
+ return this.registry.getAll();
+ }
+
+ public void addAll(List<AtlasTask> tasks) {
+ if (CollectionUtils.isEmpty(tasks)) {
+ return;
+ }
+
+ dispatchTasks(tasks);
+ }
+
+ public AtlasTask getByGuid(String guid) throws AtlasBaseException {
+ try {
+ return this.registry.getById(guid);
+ } catch (Exception exception) {
+ LOG.error("Error: getByGuid: {}", guid);
+
+ throw new AtlasBaseException(exception);
+ }
+ }
+
+ public List<AtlasTask> getByGuids(List<String> guids) throws AtlasBaseException {
+ List<AtlasTask> ret = new ArrayList<>();
+
+ for (String guid : guids) {
+ AtlasTask task = getByGuid(guid);
+
+ if (task != null) {
+ ret.add(task);
+ }
+ }
+
+ return ret;
+ }
+
+ public void deleteByGuid(String guid) throws AtlasBaseException {
+ try {
+ this.registry.deleteByGuid(guid);
+ } catch (Exception exception) {
+ throw new AtlasBaseException(exception);
+ }
+ }
+
+ public void deleteByGuids(List<String> guids) throws AtlasBaseException {
+ if (CollectionUtils.isEmpty(guids)) {
+ return;
+ }
+
+ for (String guid : guids) {
+ this.registry.deleteByGuid(guid);
+ }
+ }
+
+ private void dispatchTasks(List<AtlasTask> tasks) {
+ if (CollectionUtils.isEmpty(tasks)) {
+ return;
+ }
+
+ if (this.taskExecutorThreadLocal.get() == null) {
+ this.taskExecutorThreadLocal.set(new TaskExecutor(registry, taskTypeFactoryMap, statistics));
+ }
+
+ this.taskExecutorThreadLocal.get().addAll(tasks);
+
+ this.statistics.print();
+ }
+
+ private void startInternal() {
+ if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) {
+ return;
+ }
+
+ LOG.info("TaskManagement: Started!");
+ }
+
+ public void queuePendingTasks() {
+ if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) {
+ return;
+ }
+
+ List<AtlasTask> pendingTasks = this.registry.getPendingTasks();
+
+ LOG.info("TaskManagement: Found: {}: Tasks in pending state.", pendingTasks.size());
+
+ addAll(pendingTasks);
+ }
+
+ @VisibleForTesting
+ static Map<String, TaskFactory> createTaskTypeFactoryMap(Map<String, TaskFactory> taskTypeFactoryMap, TaskFactory factory) {
+ List<String> supportedTypes = factory.getSupportedTypes();
+
+ if (CollectionUtils.isEmpty(supportedTypes)) {
+ LOG.warn("{}: Supported types returned empty!", factory.getClass());
+
+ return taskTypeFactoryMap;
+ }
+
+ for (String type : supportedTypes) {
+ taskTypeFactoryMap.put(type, factory);
+ }
+
+ return taskTypeFactoryMap;
+ }
+
+ static class Statistics {
+ private static final TaskExecutor.TaskLogger logger = TaskExecutor.TaskLogger.getLogger();
+ private static final long REPORT_FREQUENCY = 30000L;
+
+ private final AtomicInteger total = new AtomicInteger(0);
+ private final AtomicInteger countSinceLastCheck = new AtomicInteger(0);
+ private final AtomicInteger totalWithErrors = new AtomicInteger(0);
+ private final AtomicInteger totalSucceed = new AtomicInteger(0);
+ private long lastCheckTime = System.currentTimeMillis();
+
+ public void error() {
+ this.countSinceLastCheck.incrementAndGet();
+ this.totalWithErrors.incrementAndGet();
+ }
+
+ public void success() {
+ this.countSinceLastCheck.incrementAndGet();
+ this.totalSucceed.incrementAndGet();
+ }
+
+ public void increment() {
+ increment(1);
+ }
+
+ public void increment(int delta) {
+ this.total.addAndGet(delta);
+ this.countSinceLastCheck.addAndGet(delta);
+ }
+
+ public void print() {
+ long now = System.currentTimeMillis();
+ long diff = now - this.lastCheckTime;
+
+ if (diff < REPORT_FREQUENCY) {
+ return;
+ }
+
+ logger.info(String.format("TaskManagement: Processing stats: total=%d, sinceLastStatsReport=%d completedWithErrors=%d, succeded=%d",
+ this.total.get(), this.countSinceLastCheck.getAndSet(0),
+ this.totalWithErrors.get(), this.totalSucceed.get()));
+ this.lastCheckTime = now;
+ }
+
+ public void successPrint() {
+ success();
+ print();
+ }
+
+ @VisibleForTesting
+ int getTotal() {
+ return this.total.get();
+ }
+
+ @VisibleForTesting
+ int getTotalSuccess() {
+ return this.totalSucceed.get();
+ }
+
+ @VisibleForTesting
+ int getTotalError() {
+ return this.totalWithErrors.get();
+ }
+ }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
new file mode 100644
index 0000000..fae8a4f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.utils.AtlasJson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.repository.Constants.TASK_GUID;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+
+@Component
+public class TaskRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
+
+ private AtlasGraph graph;
+
+ @Inject
+ public TaskRegistry(AtlasGraph graph) {
+ this.graph = graph;
+ }
+
+ @GraphTransaction
+ public AtlasTask save(AtlasTask task) {
+ AtlasVertex vertex = createVertex(task);
+
+ return toAtlasTask(vertex);
+ }
+
+ @GraphTransaction
+ public List<AtlasTask> getPendingTasks() {
+ List<AtlasTask> ret = new ArrayList<>();
+
+ try {
+ AtlasGraphQuery query = graph.query()
+ .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
+ .has(Constants.TASK_STATUS, AtlasTask.Status.PENDING)
+ .orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ while (results.hasNext()) {
+ AtlasVertex vertex = results.next();
+
+ ret.add(toAtlasTask(vertex));
+ }
+ } catch (Exception exception) {
+ LOG.error("Error fetching pending tasks!", exception);
+ } finally {
+ graph.commit();
+ }
+
+ return ret;
+ }
+
+ @GraphTransaction
+ public void updateStatus(AtlasVertex taskVertex, AtlasTask task) {
+ if (taskVertex == null) {
+ return;
+ }
+
+ setEncodedProperty(taskVertex, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
+ setEncodedProperty(taskVertex, Constants.TASK_STATUS, task.getStatus().toString());
+ setEncodedProperty(taskVertex, Constants.TASK_UPDATED_TIME, System.currentTimeMillis());
+ setEncodedProperty(taskVertex, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());
+ }
+
+ @GraphTransaction
+ public void deleteByGuid(String guid) throws AtlasBaseException {
+ try {
+ AtlasGraphQuery query = graph.query()
+ .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
+ .has(TASK_GUID, guid);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ if (results.hasNext()) {
+ graph.removeVertex(results.next());
+ }
+ } catch (Exception exception) {
+ LOG.error("Error: deletingByGuid: {}", guid);
+
+ throw new AtlasBaseException(exception);
+ }
+ }
+
+ @GraphTransaction
+ public void deleteComplete(AtlasVertex taskVertex, AtlasTask task) {
+ updateStatus(taskVertex, task);
+
+ deleteVertex(taskVertex);
+ }
+
+ @GraphTransaction
+ public AtlasTask getById(String guid) {
+ AtlasGraphQuery query = graph.query()
+ .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
+ .has(TASK_GUID, guid);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ return results.hasNext() ? toAtlasTask(results.next()) : null;
+ }
+
+ @GraphTransaction
+ public AtlasVertex getVertex(String taskGuid) {
+ AtlasGraphQuery query = graph.query().has(Constants.TASK_GUID, taskGuid);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ return results.hasNext() ? results.next() : null;
+ }
+
+ @GraphTransaction
+ public List<AtlasTask> getAll() {
+ List<AtlasTask> ret = new ArrayList<>();
+ AtlasGraphQuery query = graph.query()
+ .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
+ .orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ while (results.hasNext()) {
+ ret.add(toAtlasTask(results.next()));
+ }
+
+ return ret;
+ }
+
+ public void commit() {
+ this.graph.commit();
+ }
+
+ public AtlasTask createVertex(String taskType, String createdBy, Map<String, Object> parameters) {
+ AtlasTask ret = new AtlasTask(taskType, createdBy, parameters);
+
+ createVertex(ret);
+
+ return ret;
+ }
+
+ private void deleteVertex(AtlasVertex taskVertex) {
+ if (taskVertex == null) {
+ return;
+ }
+
+ graph.removeVertex(taskVertex);
+ }
+
+ private AtlasTask toAtlasTask(AtlasVertex v) {
+ AtlasTask ret = new AtlasTask();
+
+ ret.setGuid(v.getProperty(Constants.TASK_GUID, String.class));
+ ret.setType(v.getProperty(Constants.TASK_TYPE, String.class));
+ ret.setStatus(v.getProperty(Constants.TASK_STATUS, String.class));
+ ret.setCreatedBy(v.getProperty(Constants.TASK_CREATED_BY, String.class));
+ ret.setCreatedTime(new Date(v.getProperty(Constants.TASK_CREATED_TIME, Long.class)));
+ ret.setUpdatedTime(new Date(v.getProperty(Constants.TASK_UPDATED_TIME, Long.class)));
+
+ Long startTime = v.getProperty(Constants.TASK_START_TIME, Long.class);
+ if (startTime != null) {
+ ret.setStartTime(new Date(startTime));
+ }
+
+ Long endTime = v.getProperty(Constants.TASK_END_TIME, Long.class);
+ if (endTime != null) {
+ ret.setEndTime(new Date(endTime));
+ }
+
+ String parametersJson = v.getProperty(Constants.TASK_PARAMETERS, String.class);
+ ret.setParameters(AtlasType.fromJson(parametersJson, Map.class));
+
+ ret.setAttemptCount(v.getProperty(Constants.TASK_ATTEMPT_COUNT, Integer.class));
+ ret.setErrorMessage(v.getProperty(Constants.TASK_ERROR_MESSAGE, String.class));
+
+ return ret;
+ }
+
+ private AtlasVertex createVertex(AtlasTask task) {
+ AtlasVertex ret = graph.addVertex();
+
+ setEncodedProperty(ret, Constants.TASK_GUID, task.getGuid());
+ setEncodedProperty(ret, Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME);
+ setEncodedProperty(ret, Constants.TASK_STATUS, task.getStatus().toString());
+ setEncodedProperty(ret, Constants.TASK_TYPE, task.getType());
+ setEncodedProperty(ret, Constants.TASK_CREATED_BY, task.getCreatedBy());
+ setEncodedProperty(ret, Constants.TASK_CREATED_TIME, task.getCreatedTime());
+ setEncodedProperty(ret, Constants.TASK_UPDATED_TIME, task.getUpdatedTime());
+
+ if (task.getStartTime() != null) {
+ setEncodedProperty(ret, Constants.TASK_START_TIME, task.getStartTime().getTime());
+ }
+
+ if (task.getEndTime() != null) {
+ setEncodedProperty(ret, Constants.TASK_END_TIME, task.getEndTime().getTime());
+ }
+
+ setEncodedProperty(ret, Constants.TASK_PARAMETERS, AtlasJson.toJson(task.getParameters()));
+ setEncodedProperty(ret, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
+ setEncodedProperty(ret, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());
+
+ return ret;
+ }
+}
\ No newline at end of file
diff --git a/webapp/src/main/java/org/apache/atlas/BeanUtil.java b/repository/src/main/java/org/apache/atlas/util/BeanUtil.java
similarity index 97%
rename from webapp/src/main/java/org/apache/atlas/BeanUtil.java
rename to repository/src/main/java/org/apache/atlas/util/BeanUtil.java
index ef5a741..15704bc 100644
--- a/webapp/src/main/java/org/apache/atlas/BeanUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/BeanUtil.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.atlas;
+package org.apache.atlas.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java
index 71b0a4a..8dda208 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -64,9 +64,11 @@ import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+import org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.service.Service;
import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.atlas.util.SearchTracker;
@@ -183,7 +185,11 @@ public class TestModules {
// Glossary related bindings
bind(GlossaryService.class).asEagerSingleton();
- final GraphTransactionInterceptor graphTransactionInterceptor = new GraphTransactionInterceptor(new AtlasGraphProvider().get());
+ // TaskManagement
+ bind(TaskManagement.class).asEagerSingleton();
+ bind(ClassificationPropagateTaskFactory.class).asEagerSingleton();
+
+ final GraphTransactionInterceptor graphTransactionInterceptor = new GraphTransactionInterceptor(new AtlasGraphProvider().get(), null);
requestInjection(graphTransactionInterceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), graphTransactionInterceptor);
}
diff --git a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
index 9846d43..c277806 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
@@ -152,7 +152,7 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
- assertEquals(entityHeaders.size(), 1);
+ assertEquals(entityHeaders.size(), 11);
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
new file mode 100644
index 0000000..e309a76
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.tagpropagation;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.exception.EntityNotFoundException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.AtlasTestBase;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.impexp.ImportService;
+import org.apache.atlas.repository.impexp.ZipFileResourceTestUtils;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.tasks.TaskManagement;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.lang.StringUtils;
+import org.testng.SkipException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
+import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
+ private static final String IMPORT_FILE = "tag-propagation-data.zip";
+
+ private static final String HDFS_PATH_EMPLOYEES = "a3955120-ac17-426f-a4af-972ec8690e5f";
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @Inject
+ private AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasEntityStore entityStore;
+
+ @Inject
+ private ImportService importService;
+
+ @Inject
+ private EntityGraphMapper entityGraphMapper;
+
+ @Inject
+ private TaskManagement tasksManagement;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ RequestContext.clear();
+
+ super.initialize();
+
+ this.tasksManagement.start();
+ entityGraphMapper.setTasksUseFlag(true);
+
+ loadModelFilesAndImportTestData();
+ }
+ private void loadModelFilesAndImportTestData() {
+ try {
+ loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
+ loadModelFromJson("1000-Hadoop/1020-fs_model.json", typeDefStore, typeRegistry);
+ loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
+
+ loadSampleClassificationDefs();
+
+ runImportWithNoParameters(importService, getZipSource(IMPORT_FILE));
+ } catch (AtlasBaseException | IOException e) {
+ throw new SkipException("Model loading failed!");
+ }
+ }
+
+ private void loadSampleClassificationDefs() throws AtlasBaseException {
+ AtlasClassificationDef tagX = new AtlasClassificationDef("tagX");
+ AtlasClassificationDef tagY = new AtlasClassificationDef("tagY");
+
+ typeDefStore.createTypesDef(new AtlasTypesDef(Collections.emptyList(), Collections.emptyList(),
+ Arrays.asList(tagX, tagY),
+ Collections.emptyList(), Collections.emptyList()));
+ }
+
+ public static InputStream getZipSource(String fileName) throws IOException {
+ return ZipFileResourceTestUtils.getFileInputStream(fileName);
+ }
+
+ @Test
+ public void parameterValidation() throws AtlasBaseException {
+ try {
+ entityGraphMapper.propagateClassification(null, null, null);
+ entityGraphMapper.propagateClassification("unknown", "abcd", "xyz");
+ }
+ catch (AtlasBaseException e) {
+ assertNotNull(e.getCause());
+ assertTrue(e.getCause() instanceof EntityNotFoundException);
+ }
+
+ List<String> ret = entityGraphMapper.propagateClassification(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
+ assertNull(ret);
+
+ ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
+ assertNull(ret);
+
+ ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY);
+ assertNull(ret);
+
+ AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
+ ret = entityGraphMapper.propagateClassification(hdfs_employees.getGuid(), StringUtils.EMPTY, StringUtils.EMPTY);
+ assertNull(ret);
+ }
+
+ @Test
+ public void add() throws AtlasBaseException {
+ final String TAG_NAME_X = "tagX";
+ final String TAG_NAME_Y = "tagY";
+
+ AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
+
+ AtlasClassification tagX = new AtlasClassification(TAG_NAME_X);
+ tagX.setEntityGuid(hdfs_employees.getGuid());
+ tagX.setPropagate(true);
+
+ AtlasClassification tagY = new AtlasClassification(TAG_NAME_Y);
+ tagY.setEntityGuid(hdfs_employees.getGuid());
+ tagY.setPropagate(false);
+
+ entityStore.addClassification(Collections.singletonList(HDFS_PATH_EMPLOYEES), tagX);
+ entityStore.addClassification(Collections.singletonList(HDFS_PATH_EMPLOYEES), tagY);
+
+ AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
+ AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_X);
+
+ assertNotNull(entityVertex);
+ assertNotNull(classificationVertex);
+
+ AtlasEntity entityUpdated = getEntity(HDFS_PATH_EMPLOYEES);
+ assertNotNull(entityUpdated.getPendingTasks());
+ List<String> impactedEntities = entityGraphMapper.propagateClassification(hdfs_employees.getGuid(), classificationVertex.getId().toString(), StringUtils.EMPTY);
+ assertNotNull(impactedEntities);
+ }
+
+ @Test(dependsOnMethods = "add")
+ public void update() throws AtlasBaseException {
+ final String TAG_NAME_Y = "tagY";
+
+ AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
+ AtlasClassification tagY = new AtlasClassification(TAG_NAME_Y);
+ tagY.setEntityGuid(hdfs_employees.getGuid());
+ tagY.setPropagate(true);
+
+ entityStore.updateClassifications(hdfs_employees.getGuid(), Collections.singletonList(tagY));
+
+ AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
+ AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_Y);
+
+ assertNotNull(RequestContext.get().getQueuedTasks());
+ assertTrue(RequestContext.get().getQueuedTasks().size() > 0, "No tasks were queued!");
+
+ assertNotNull(entityVertex);
+ assertNotNull(classificationVertex);
+
+ List<String> impactedEntities = entityGraphMapper.updateClassificationsPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString(), StringUtils.EMPTY);
+ assertNotNull(impactedEntities);
+ }
+
+ @Test(dependsOnMethods = "update")
+ public void delete() throws AtlasBaseException {
+ final String TAG_NAME = "tagX";
+
+ AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
+ entityGraphMapper.propagateClassification(hdfs_employees.getGuid(), StringUtils.EMPTY, StringUtils.EMPTY);
+
+ AtlasClassification tagX = new AtlasClassification(TAG_NAME);
+ tagX.setEntityGuid(hdfs_employees.getGuid());
+ tagX.setPropagate(false);
+
+ AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
+ AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME);
+
+ entityStore.deleteClassification(HDFS_PATH_EMPLOYEES, tagX.getTypeName());
+
+ assertNotNull(entityVertex);
+ assertNotNull(classificationVertex);
+
+ List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(classificationVertex.getId().toString());
+ assertNotNull(impactedEntities);
+ }
+
+ private AtlasEntity getEntity(String entityGuid) throws AtlasBaseException {
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(entityGuid);
+
+ return entityWithExtInfo.getEntity();
+ }
+}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/tasks/BaseTaskFixture.java b/repository/src/test/java/org/apache/atlas/tasks/BaseTaskFixture.java
new file mode 100644
index 0000000..51e9bf7
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/tasks/BaseTaskFixture.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class BaseTaskFixture {
+ protected static final String SPYING_TASK_ADD = "add";
+ protected static final String SPYING_TASK_ERROR_THROWING = "errorThrowingTask";
+
+ @Inject
+ protected AtlasGraph graph;
+
+ @Inject
+ protected TaskRegistry taskRegistry;
+
+ static class SpyConcreteTask extends AbstractTask {
+ private boolean taskPerformed;
+
+ public SpyConcreteTask(AtlasTask atlasTask) {
+ super(atlasTask);
+ }
+
+ @Override
+ public AtlasTask.Status perform() {
+ this.taskPerformed = true;
+
+ return AtlasTask.Status.COMPLETE;
+ }
+
+ public boolean taskPerformed() {
+ return this.taskPerformed;
+ }
+ }
+
+ static class SpyErrorThrowingTask extends AbstractTask {
+ private boolean taskPerformed;
+
+ public SpyErrorThrowingTask(AtlasTask atlasTask) {
+ super(atlasTask);
+ }
+
+ @Override
+ public AtlasTask.Status perform() {
+ this.taskPerformed = true;
+
+ throw new NullPointerException("SpyErrorThrowingTask: NullPointerException Encountered!");
+ }
+
+ public boolean taskPerformed() {
+ return this.taskPerformed;
+ }
+ }
+
+ static class SpyingFactory implements TaskFactory {
+ private SpyConcreteTask addTask;
+ private SpyErrorThrowingTask errorTask;
+
+ @Override
+ public AbstractTask create(AtlasTask atlasTask) {
+ switch (atlasTask.getType()) {
+ case "add":
+ addTask = new SpyConcreteTask(atlasTask);
+ return addTask;
+
+ case "errorThrowingTask":
+ errorTask = new SpyErrorThrowingTask(atlasTask);
+ return errorTask;
+
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public List<String> getSupportedTypes() {
+ return new ArrayList<String>() {{
+ add(SPYING_TASK_ADD);
+ add(SPYING_TASK_ERROR_THROWING);
+ }};
+ }
+
+ public SpyConcreteTask getAddTask() {
+ return this.addTask;
+ }
+
+ public SpyErrorThrowingTask getErrorTask() {
+ return this.errorTask;
+ }
+ }
+
+ protected AtlasTask createTask(TaskManagement taskManagement, String type) {
+ return taskManagement.createTask(type, "testUser", Collections.singletonMap("params", "params"));
+ }
+}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/tasks/TaskExecutorTest.java b/repository/src/test/java/org/apache/atlas/tasks/TaskExecutorTest.java
new file mode 100644
index 0000000..f8d131c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/tasks/TaskExecutorTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.commons.lang3.StringUtils;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TaskExecutorTest extends BaseTaskFixture {
+ @Inject
+ private AtlasGraph graph;
+
+ @Inject
+ private TaskRegistry taskRegistry;
+
+ @Inject
+ private TaskManagement taskManagement;
+
+ @Test
+ public void noTasksExecuted() {
+ TaskManagementTest.SpyingFactory spyingFactory = new TaskManagementTest.SpyingFactory();
+ Map<String, TaskFactory> taskFactoryMap = new HashMap<>();
+ TaskManagement.createTaskTypeFactoryMap(new HashMap<>(), spyingFactory);
+
+ TaskManagement.Statistics statistics = new TaskManagement.Statistics();
+ new TaskExecutor(taskRegistry, taskFactoryMap, statistics);
+
+ Assert.assertEquals(statistics.getTotal(), 0);
+ }
+
+ @Test
+ public void tasksNotPersistedIsNotExecuted() throws InterruptedException {
+ TaskManagementTest.SpyingFactory spyingFactory = new TaskManagementTest.SpyingFactory();
+ Map<String, TaskFactory> taskFactoryMap = new HashMap<>();
+ TaskManagement.createTaskTypeFactoryMap(taskFactoryMap, spyingFactory);
+
+ TaskManagement.Statistics statistics = new TaskManagement.Statistics();
+ TaskExecutor taskExecutor = new TaskExecutor(taskRegistry, taskFactoryMap, statistics);
+
+ taskExecutor.addAll(Collections.singletonList(new AtlasTask(SPYING_TASK_ADD, "test", Collections.emptyMap())));
+
+ taskExecutor.waitUntilDone();
+ Assert.assertEquals(statistics.getTotal(), 0);
+ }
+
+
+ @Test
+ public void persistedIsExecuted() throws AtlasBaseException, InterruptedException {
+ TaskManagementTest.SpyingFactory spyingFactory = new TaskManagementTest.SpyingFactory();
+ Map<String, TaskFactory> taskFactoryMap = new HashMap<>();
+ TaskManagement.createTaskTypeFactoryMap(taskFactoryMap, spyingFactory);
+
+ AtlasTask addTask = taskManagement.createTask("add", "test", Collections.emptyMap());
+ AtlasTask errorThrowingTask = taskManagement.createTask("errorThrowingTask", "test", Collections.emptyMap());
+
+ TaskManagement.Statistics statistics = new TaskManagement.Statistics();
+ List<AtlasTask> tasks = new ArrayList<AtlasTask>() {{
+ add(addTask);
+ add(errorThrowingTask);
+ }};
+ graph.commit();
+
+ TaskExecutor taskExecutor = new TaskExecutor(taskRegistry, taskFactoryMap, statistics);
+ taskExecutor.addAll(tasks);
+
+ taskExecutor.waitUntilDone();
+ Assert.assertEquals(statistics.getTotal(), 2);
+ Assert.assertEquals(statistics.getTotalSuccess(), 1);
+ Assert.assertEquals(statistics.getTotalError(), 1);
+
+ Assert.assertNotNull(spyingFactory.getAddTask());
+ Assert.assertNotNull(spyingFactory.getErrorTask());
+
+ Assert.assertTrue(spyingFactory.getAddTask().taskPerformed());
+ Assert.assertTrue(spyingFactory.getErrorTask().taskPerformed());
+
+ assertTaskUntilFail(errorThrowingTask, taskExecutor);
+ }
+
+ private void assertTaskUntilFail(AtlasTask errorThrowingTask, TaskExecutor taskExecutor) throws AtlasBaseException, InterruptedException {
+ AtlasTask errorTaskFromDB = taskManagement.getByGuid(errorThrowingTask.getGuid());
+ Assert.assertNotNull(errorTaskFromDB);
+ Assert.assertTrue(StringUtils.isNotEmpty(errorTaskFromDB.getErrorMessage()));
+ Assert.assertEquals(errorTaskFromDB.getAttemptCount(), 1);
+ Assert.assertEquals(errorTaskFromDB.getStatus(), AtlasTask.Status.PENDING);
+
+ for (int i = errorTaskFromDB.getAttemptCount(); i <= AtlasTask.MAX_ATTEMPT_COUNT; i++) {
+ taskExecutor.addAll(Collections.singletonList(errorThrowingTask));
+ }
+
+ taskExecutor.waitUntilDone();
+ graph.commit();
+ Assert.assertEquals(errorThrowingTask.getStatus(), AtlasTask.Status.FAILED);
+ }
+}
diff --git a/repository/src/test/java/org/apache/atlas/tasks/TaskManagementTest.java b/repository/src/test/java/org/apache/atlas/tasks/TaskManagementTest.java
new file mode 100644
index 0000000..6f0b89c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/tasks/TaskManagementTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.testng.Assert;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TaskManagementTest extends BaseTaskFixture {
+
+ private static class NullFactory implements TaskFactory {
+ @Override
+ public AbstractTask create(AtlasTask atlasTask) {
+ return null;
+ }
+
+ @Override
+ public List<String> getSupportedTypes() {
+ return null;
+ }
+ }
+
+ @Test
+ public void factoryReturningNullIsHandled() throws AtlasException {
+ TaskManagement taskManagement = new TaskManagement(null, taskRegistry, new NullFactory());
+ taskManagement.start();
+ }
+
+ @Test
+ public void taskSucceedsTaskVertexRemoved() throws AtlasException, InterruptedException, AtlasBaseException {
+ SpyingFactory spyingFactory = new SpyingFactory();
+ TaskManagement taskManagement = new TaskManagement(null, taskRegistry, spyingFactory);
+ taskManagement.start();
+
+ AtlasTask spyTask = createTask(taskManagement, SPYING_TASK_ADD);
+ AtlasTask spyTaskError = createTask(taskManagement, SPYING_TASK_ERROR_THROWING);
+ graph.commit();
+
+ taskManagement.addAll(Arrays.asList(spyTask, spyTaskError));
+
+ TimeUnit.SECONDS.sleep(5);
+ Assert.assertTrue(spyingFactory.getAddTask().taskPerformed());
+ Assert.assertTrue(spyingFactory.getErrorTask().taskPerformed());
+
+ AtlasTask task = taskManagement.getByGuid(spyTask.getGuid());
+ Assert.assertNull(task);
+ }
+
+ @Test
+ public void severalTaskAdds() throws AtlasException, InterruptedException {
+ int MAX_THREADS = 5;
+
+ TaskManagement taskManagement = new TaskManagement(null, taskRegistry);
+ taskManagement.start();
+
+ Thread[] threads = new Thread[MAX_THREADS];
+ for (int i = 0; i < MAX_THREADS; i++) {
+ threads[i] = new Thread(() -> {
+ try {
+ AtlasTask spyAdd = taskManagement.createTask(SPYING_TASK_ADD, "test", Collections.emptyMap());
+ AtlasTask spyErr = taskManagement.createTask(SPYING_TASK_ERROR_THROWING, "test", Collections.emptyMap());
+
+ taskManagement.addAll(Collections.singletonList(spyAdd));
+ taskManagement.addAll(Collections.singletonList(spyErr));
+
+ Thread.sleep(10000);
+ for (int j = 0; j <= AtlasTask.MAX_ATTEMPT_COUNT; j++) {
+ taskManagement.addAll(Collections.singletonList(spyErr));
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ for (int i = 0; i < MAX_THREADS; i++) {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < MAX_THREADS; i++) {
+ threads[i].join();
+ }
+ }
+}
diff --git a/repository/src/test/java/org/apache/atlas/tasks/TaskRegistryTest.java b/repository/src/test/java/org/apache/atlas/tasks/TaskRegistryTest.java
new file mode 100644
index 0000000..7f139dc
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/tasks/TaskRegistryTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.testng.Assert;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.List;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TaskRegistryTest {
+ @Inject
+ AtlasGraph graph;
+
+ @Inject
+ TaskRegistry registry;
+
+ @Test
+ public void basic() throws AtlasException, AtlasBaseException {
+ AtlasTask task = new AtlasTask("abcd", "test", Collections.singletonMap("p1", "p1"));
+ Assert.assertNull(registry.getById(task.getGuid()));
+
+ AtlasTask taskFromVertex = registry.save(task);
+ AtlasVertex taskVertex = registry.getVertex(task.getGuid());
+
+ Assert.assertEquals(taskFromVertex.getGuid(), task.getGuid());
+ Assert.assertEquals(taskFromVertex.getType(), task.getType());
+ Assert.assertEquals(taskFromVertex.getAttemptCount(), task.getAttemptCount());
+ Assert.assertEquals(taskFromVertex.getParameters(), task.getParameters());
+ Assert.assertEquals(taskFromVertex.getCreatedBy(), task.getCreatedBy());
+
+ taskFromVertex.incrementAttemptCount();
+ taskFromVertex.setStatusPending();
+ registry.updateStatus(taskVertex, taskFromVertex);
+ registry.commit();
+
+ taskFromVertex = registry.getById(task.getGuid());
+ Assert.assertNotNull(taskVertex);
+ Assert.assertEquals(taskFromVertex.getStatus(), AtlasTask.Status.PENDING);
+ Assert.assertEquals(taskFromVertex.getAttemptCount(), 1);
+
+ registry.deleteByGuid(taskFromVertex.getGuid());
+
+ try {
+ AtlasTask t = registry.getById(taskFromVertex.getGuid());
+ Assert.assertNull(t);
+ }
+ catch (IllegalStateException e) {
+ Assert.assertTrue(true, "Indicates vertex is deleted!");
+ }
+ }
+
+ @Test
+ public void pendingTasks() throws AtlasBaseException {
+ final int MAX_TASKS = 3;
+ final String TASK_TYPE_FORMAT = "abcd:%d";
+
+ for (int i = 0; i < MAX_TASKS; i++) {
+ AtlasTask task = new AtlasTask(String.format(TASK_TYPE_FORMAT, i), "test", Collections.singletonMap("p1", "p1"));
+ registry.save(task);
+ }
+
+ List<AtlasTask> pendingTasks = registry.getPendingTasks();
+ Assert.assertEquals(pendingTasks.size(), MAX_TASKS);
+
+ for (int i = 0; i < MAX_TASKS; i++) {
+ Assert.assertEquals(pendingTasks.get(i).getType(), String.format(TASK_TYPE_FORMAT, i));
+ registry.deleteByGuid(pendingTasks.get(i).getGuid());
+ }
+
+ graph.commit();
+ pendingTasks = registry.getPendingTasks();
+ Assert.assertEquals(pendingTasks.size(), 0);
+ }
+}
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 37d23c2..e144d36 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.lang.StringUtils;
@@ -30,13 +31,13 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.Map;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.HashMap;
+import java.util.Set;
import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
@@ -61,6 +62,8 @@ public class RequestContext {
private final Set<String> entitiesToSkipUpdate = new HashSet<>();
private final Set<String> onlyCAUpdateEntities = new HashSet<>();
private final Set<String> onlyBAUpdateEntities = new HashSet<>();
+ private final List<AtlasTask> queuedTasks = new ArrayList<>();
+
private String user;
private Set<String> userGroups;
@@ -122,6 +125,7 @@ public class RequestContext {
this.entitiesToSkipUpdate.clear();
this.onlyCAUpdateEntities.clear();
this.onlyBAUpdateEntities.clear();
+ this.queuedTasks.clear();
if (metrics != null && !metrics.isEmpty()) {
METRICS.debug(metrics.toString());
@@ -446,6 +450,14 @@ public class RequestContext {
}
}
+ public void queueTask(AtlasTask task) {
+ queuedTasks.add(task);
+ }
+
+ public List<AtlasTask> getQueuedTasks() {
+ return this.queuedTasks;
+ }
+
public class EntityGuidPair {
private final Object entity;
private final String guid;
diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
index bb7f8fc..ba8f088 100644
--- a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
+++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
@@ -32,8 +32,8 @@ public interface ActiveStateChangeHandler {
TYPEDEF_STORE_INITIALIZER(2),
ATLAS_PATCH_SERVICE(3),
DEFAULT_METADATA_SERVICE(4),
- NOTIFICATION_HOOK_CONSUMER(5);
-
+ NOTIFICATION_HOOK_CONSUMER(5),
+ TASK_MANAGEMENT(6);
private final int order;
diff --git a/test-tools/pom.xml b/test-tools/pom.xml
index 50ee675..245f07f 100644
--- a/test-tools/pom.xml
+++ b/test-tools/pom.xml
@@ -49,10 +49,6 @@
<artifactId>gmetric4j</artifactId>
</exclusion>
<exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </exclusion>
- <exclusion>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</exclusion>
diff --git a/test-tools/src/main/resources/log4j.properties b/test-tools/src/main/resources/log4j.properties
index 4db0598..e3e159c 100644
--- a/test-tools/src/main/resources/log4j.properties
+++ b/test-tools/src/main/resources/log4j.properties
@@ -30,3 +30,12 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%t]
# Set both appenders (file and console) on the root logger.
#log4j.rootLogger=INFO, file, console
log4j.rootLogger=ERROR, file
+
+# Tasks log file
+log4j.appender.TASKS=org.apache.log4j.RollingFileAppender
+log4j.appender.TASKS.File=targettasks.log
+log4j.appender.TASKS.Append=true
+log4j.appender.TASKS.layout=org.apache.log4j.PatternLayout
+log4j.appender.TASKS.layout.ConversionPattern=[%t] %d %x %m%n
+log4j.appender.TASKS.layout.maxFileSize=1024MB
+log4j.category.TASKS=INFO,TASKS
diff --git a/test-tools/src/main/resources/solr/core-template/solrconfig.xml b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
index 4d7e444..977ebc4 100644
--- a/test-tools/src/main/resources/solr/core-template/solrconfig.xml
+++ b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
@@ -433,29 +433,19 @@
will be overridden by parameters in the request
-->
<!--
- 35x_t __guid
- f0l_t __typeName
- i6d_l __timestamp
- jr9_t __state
- lc5_t __classificationsText
- mx1_t __classificationNames
- iyt_l __modificationTimestamp
- zk5_t __customAttributes
- 1151_t __labels
- ohx_t __propagatedClassificationNames
- 3thh_s Asset.__s_name
- 3z0l_s Asset.__s_owner
- 3wn9_t Asset.description
- 3v2d_s Asset.__s_displayName
- 3xfp_s Asset.__s_userDescription
- 3mdh_t Referenceable.qualifiedName
- c45h_t hive_serde.name
- cb9h_t hive_process.queryText
- c83p_t hive_process.userName
+ Steps to generate these keys:
+ - Add your fields to GraphBackedSearchIndexer.
+ - Run AtlasDiscoverServiceTest. Most likely the test will fail as the sequence of the fields
+ loaded from the models would have changed and hence the names would have changed.
+ - From typeRegistry, use typeRegistry.commonIndexFieldNameCache.values().stream().collect(Collectors.joining(" "))
+ to collect all the field names. Past them here in the 'qf' tag.
+ - Clean and build.
+ - Re-run the test. It should pass now.
+ Thanks to @pshah for the steps.
-->
<lst name="defaults">
<str name="defType">edismax</str>
- <str name="qf">35x_t f0l_t i6d_l jr9_t lc5_t mx1_t iyt_l zk5_t 1151_t ohx_t 3thh_s 3z0l_s 3wn9_t 3v2d_s 3xfp_s 3mdh_t c45h_t cb9h_t c83p_t</str>
+ <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_t 16o5_t 1891_t 19tx_t 1bet_t 1czp_t 1fd1_t 1gxx_l 1iit_t 3v2d_t 47ph_s 464l_s 43r9_s 426d_s 45c5_t 4d8l_t 49ad_t 4av9_l 4bnp_t 4hz9_t 4ged_t 4jk5_t 4qo5_t 4w79_t 50xx_t 4ykl_t 4zd1_t 4xs5_t 579h_t 56h1_t 5f5x_l 5gqt_l 5dl1_t 5c05_t 5ibp_t 581x_t 58ud_t 5af9_t 5slh_t 5tdx_l 5pfp_t 5m9x_l 5nut_l 5j45_t 5r0l_t 8ikl_t 8ao5_t 8k5h_t 8g79_t 8jd1_t 8hs5_ [...]
<str name="hl.fl">*</str>
<bool name="hl.requireFieldMatch">true</bool>
<bool name="lowercaseOperators">true</bool>
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index e8fc111..714b400 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -55,9 +55,11 @@ import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.MigrationProgressService;
import org.apache.atlas.repository.impexp.ZipSink;
+import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService;
+import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.SearchTracker;
@@ -78,6 +80,7 @@ import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service;
+import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
@@ -135,7 +138,7 @@ public class AdminResource {
private static final String UI_DATE_FORMAT = "atlas.ui.date.format";
private static final String UI_DATE_DEFAULT_FORMAT = "MM/DD/YYYY hh:mm:ss A";
private static final String OPERATION_STATUS = "operationStatus";
- private static final List TIMEZONE_LIST = Arrays.asList(TimeZone.getAvailableIDs());
+ private static final List TIMEZONE_LIST = Arrays.asList(TimeZone.getAvailableIDs());
@Context
private HttpServletRequest httpServletRequest;
@@ -155,6 +158,7 @@ public class AdminResource {
private final MigrationProgressService migrationProgressService;
private final ReentrantLock importExportOperationLock;
private final ExportImportAuditService exportImportAuditService;
+ private final TaskManagement taskManagement;
private final AtlasServerService atlasServerService;
private final AtlasEntityStore entityStore;
private final AtlasPatchManager patchManager;
@@ -178,7 +182,8 @@ public class AdminResource {
MigrationProgressService migrationProgressService,
AtlasServerService serverService,
ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore,
- AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository) {
+ AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository,
+ TaskManagement taskManagement) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.exportService = exportService;
@@ -193,6 +198,7 @@ public class AdminResource {
this.patchManager = patchManager;
this.auditService = auditService;
this.auditRepository = auditRepository;
+ this.taskManagement = taskManagement;
if (atlasProperties != null) {
this.defaultUIVersion = atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V2);
@@ -205,6 +211,11 @@ public class AdminResource {
}
}
+ @PostConstruct
+ public void init() {
+ taskManagement.queuePendingTasks();
+ }
+
/**
* Fetches the thread stack dump for this application.
*
@@ -737,6 +748,22 @@ public class AdminResource {
return ret;
}
+ @GET
+ @Path("/tasks")
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public List<AtlasTask> getTaskStatus(@QueryParam("guids") List<String> guids) throws AtlasBaseException {
+ return CollectionUtils.isNotEmpty(guids) ? taskManagement.getByGuids(guids) : taskManagement.getAll();
+ }
+
+ @DELETE
+ @Path("/tasks")
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public void deleteTask(@QueryParam("guids") List<String> guids) throws AtlasBaseException {
+ if (CollectionUtils.isNotEmpty(guids)) {
+ taskManagement.deleteByGuids(guids);
+ }
+ }
+
private String getEditableEntityTypes(Configuration config) {
String ret = DEFAULT_EDITABLE_ENTITY_TYPES;
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
index d6658fe..61aa313 100755
--- a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
@@ -20,12 +20,10 @@ package org.apache.atlas.web.service;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.BeanUtil;
-import org.apache.atlas.RequestContext;
+import org.apache.atlas.util.BeanUtil;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.AtlasAuditEntry;
import org.apache.atlas.repository.audit.AtlasAuditService;
-import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -37,8 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index 77422b2..ef5350d 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -51,7 +51,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
- AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null);
+ AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
@@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
- AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null);
+ AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
verify(serviceState).getState();
diff --git a/webapp/src/test/resources/atlas-application.properties b/webapp/src/test/resources/atlas-application.properties
index 3847a3d..adeade8 100644
--- a/webapp/src/test/resources/atlas-application.properties
+++ b/webapp/src/test/resources/atlas-application.properties
@@ -127,3 +127,6 @@ atlas.authentication.method.kerberos=false
######### Gremlin Search Configuration #########
# Set to false to disable gremlin search.
atlas.search.gremlin.enable=true
+
+######### Configure use of Tasks #########
+atlas.tasks.enabled=false
diff --git a/webapp/src/test/resources/template_metadata.csv b/webapp/src/test/resources/template_metadata.csv
index 6b6359c..2f662ba 100644
--- a/webapp/src/test/resources/template_metadata.csv
+++ b/webapp/src/test/resources/template_metadata.csv
@@ -1,2 +1,2 @@
TypeName,UniqueAttributeValue,BusinessAttributeName,BusinessAttributeValue,UniqueAttributeName[optional]
-hive_table_v2,tableqZPo3C488c,bmWithAllTypes.attr8,"Awesome Attribute 1",qualifiedName
+hive_table_v2,tableY2Q72pP2Do,bmWithAllTypes.attr8,"Awesome Attribute 1",qualifiedName