You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/04/14 23:50:55 UTC

[atlas] branch branch-2.0 updated: ATLAS-3919: Handling classification propagation as deferred-action

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 6eb970c  ATLAS-3919: Handling classification propagation as deferred-action
6eb970c is described below

commit 6eb970ce23d4ce290687512d3871db8f086a9e4f
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Apr 14 16:49:24 2021 -0700

    ATLAS-3919: Handling classification propagation as deferred-action
    
    Co-authored-by: Jayendra Parab <ja...@gmail.com>
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
    (cherry picked from commit 0751f16250220271ce63ab7523de37e1f85defda)
---
 .../org/apache/atlas/repository/Constants.java     |  19 ++
 distro/src/conf/atlas-log4j.xml                    |  15 ++
 .../java/org/apache/atlas/AtlasConfiguration.java  |   3 +-
 .../apache/atlas/model/instance/AtlasEntity.java   |  14 +
 .../org/apache/atlas/model/tasks/AtlasTask.java    | 192 ++++++++++++++
 .../org/apache/atlas/type/AtlasEntityType.java     |   1 +
 .../org/apache/atlas/type/AtlasTypeRegistry.java   |   2 +-
 .../main/java/org/apache/atlas/type/Constants.java |   1 +
 .../test/resources/atlas-application.properties    |   3 +
 .../apache/atlas/GraphTransactionInterceptor.java  |  24 +-
 .../repository/graph/GraphBackedSearchIndexer.java |   7 +
 .../store/graph/v1/DeleteHandlerDelegate.java      |  16 +-
 .../repository/store/graph/v1/DeleteHandlerV1.java | 284 +++++++++++++++------
 .../store/graph/v1/HardDeleteHandlerV1.java        |   5 +-
 .../store/graph/v1/SoftDeleteHandlerV1.java        |   5 +-
 .../store/graph/v2/AtlasGraphUtilsV2.java          |  16 ++
 .../store/graph/v2/AtlasRelationshipStoreV2.java   | 159 +-----------
 .../store/graph/v2/EntityGraphMapper.java          | 236 ++++++++++++++++-
 .../store/graph/v2/EntityGraphRetriever.java       |  93 +++++--
 .../store/graph/v2/bulkimport/MigrationImport.java |   4 +-
 .../tasks/ClassificationPropagateTaskFactory.java  |  93 +++++++
 .../v2/tasks/ClassificationPropagationTasks.java   |  89 +++++++
 .../store/graph/v2/tasks/ClassificationTask.java   | 137 ++++++++++
 .../java/org/apache/atlas/tasks/AbstractTask.java  |  68 +++++
 .../java/org/apache/atlas/tasks/TaskExecutor.java  | 181 +++++++++++++
 .../java/org/apache/atlas/tasks/TaskFactory.java   |  28 +-
 .../org/apache/atlas/tasks/TaskManagement.java     | 280 ++++++++++++++++++++
 .../java/org/apache/atlas/tasks/TaskRegistry.java  | 235 +++++++++++++++++
 .../main/java/org/apache/atlas/util}/BeanUtil.java |   2 +-
 .../test/java/org/apache/atlas/TestModules.java    |   8 +-
 .../atlas/discovery/AtlasDiscoveryServiceTest.java |   2 +-
 .../ClassificationPropagationWithTasksTest.java    | 227 ++++++++++++++++
 .../org/apache/atlas/tasks/BaseTaskFixture.java    | 116 +++++++++
 .../org/apache/atlas/tasks/TaskExecutorTest.java   | 123 +++++++++
 .../org/apache/atlas/tasks/TaskManagementTest.java | 109 ++++++++
 .../org/apache/atlas/tasks/TaskRegistryTest.java   |  99 +++++++
 .../main/java/org/apache/atlas/RequestContext.java |  20 +-
 .../atlas/listener/ActiveStateChangeHandler.java   |   4 +-
 test-tools/pom.xml                                 |   4 -
 test-tools/src/main/resources/log4j.properties     |   9 +
 .../resources/solr/core-template/solrconfig.xml    |  30 +--
 .../apache/atlas/web/resources/AdminResource.java  |  31 ++-
 .../apache/atlas/web/service/EmbeddedServer.java   |   6 +-
 .../atlas/web/resources/AdminResourceTest.java     |   4 +-
 .../test/resources/atlas-application.properties    |   3 +
 webapp/src/test/resources/template_metadata.csv    |   2 +-
 46 files changed, 2663 insertions(+), 346 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 771287f..4df38a5 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -94,6 +94,7 @@ public final class Constants {
     public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY  = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames");
     public static final String CUSTOM_ATTRIBUTES_PROPERTY_KEY       = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "customAttributes");
     public static final String LABELS_PROPERTY_KEY                  = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "labels");
+    public static final String PENDING_TASKS_PROPERTY_KEY           = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks");
 
     /**
      * Patch vertices property keys.
@@ -208,6 +209,24 @@ public final class Constants {
     public static final String  TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE   = "ADD_MANDATORY_ATTRIBUTE";
 
     /*
+     * Task related constants
+     */
+    public static final String TASK_PREFIX            = INTERNAL_PROPERTY_KEY_PREFIX + "task_";
+    public static final String TASK_TYPE_PROPERTY_KEY = encodePropertyKey(TASK_PREFIX + "v_type");
+    public static final String TASK_TYPE_NAME         = INTERNAL_PROPERTY_KEY_PREFIX + "AtlasTaskDef";
+    public static final String TASK_GUID              = encodePropertyKey(TASK_PREFIX + "guid");
+    public static final String TASK_TYPE              = encodePropertyKey(TASK_PREFIX + "type");
+    public static final String TASK_CREATED_TIME      = encodePropertyKey(TASK_PREFIX + "timestamp");
+    public static final String TASK_UPDATED_TIME      = encodePropertyKey(TASK_PREFIX + "modificationTimestamp");
+    public static final String TASK_CREATED_BY        = encodePropertyKey(TASK_PREFIX + "createdBy");
+    public static final String TASK_STATUS            = encodePropertyKey(TASK_PREFIX + "status");
+    public static final String TASK_ATTEMPT_COUNT     = encodePropertyKey(TASK_PREFIX + "attemptCount");
+    public static final String TASK_PARAMETERS        = encodePropertyKey(TASK_PREFIX + "parameters");
+    public static final String TASK_ERROR_MESSAGE     = encodePropertyKey(TASK_PREFIX + "errorMessage");
+    public static final String TASK_START_TIME        = encodePropertyKey(TASK_PREFIX + "startTime");
+    public static final String TASK_END_TIME          = encodePropertyKey(TASK_PREFIX + "endTime");
+
+    /*
      * All supported file-format extensions for Bulk Imports through file upload
      */
     public enum SupportedFileExtensions { XLSX, XLS, CSV }
diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml
index 7df963e..90e1301 100755
--- a/distro/src/conf/atlas-log4j.xml
+++ b/distro/src/conf/atlas-log4j.xml
@@ -57,6 +57,16 @@
         </layout>
     </appender>
 
+    <appender name="TASKS" class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="${atlas.log.dir}/tasks.log"/>
+        <param name="Append" value="true"/>
+        <param name="maxFileSize" value="259MB" />
+        <param name="maxBackupIndex" value="20" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%t %d %x %m%n (%C{1}:%L)"/>
+        </layout>
+    </appender>
+
     <appender name="METRICS" class="org.apache.log4j.RollingFileAppender">
         <param name="File" value="${atlas.log.dir}/metric.log"/>
         <param name="Append" value="true"/>
@@ -144,6 +154,11 @@
         <appender-ref ref="FAILED"/>
     </logger>
 
+    <logger name="TASKS" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="TASKS"/>
+    </logger>
+
     <root>
         <priority value="warn"/>
         <appender-ref ref="FILE"/>
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 08d6c9d..05b0784 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -76,7 +76,8 @@ public enum AtlasConfiguration {
     STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true),
     REBUILD_INDEX("atlas.rebuild.index", false),
     STORE_DIFFERENTIAL_AUDITS("atlas.entity.audit.differential", false),
-    DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true);
+    DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true),
+    TASKS_USE_ENABLED("atlas.tasks.enabled", true);
 
     private static final Configuration APPLICATION_PROPERTIES;
 
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 4d8c948..5f8c6e8 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -95,6 +95,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
     private Map<String, String>              customAttributes;
     private Map<String, Map<String, Object>> businessAttributes;
     private Set<String>                      labels;
+    private Set<String>                      pendingTasks; // read-only field i.e. value provided is ignored during entity create/update
 
     @JsonIgnore
     private static AtomicLong s_nextId = new AtomicLong(System.nanoTime());
@@ -220,6 +221,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
             setCustomAttributes(other.getCustomAttributes());
             setBusinessAttributes(other.getBusinessAttributes());
             setLabels(other.getLabels());
+            setPendingTasks(other.getPendingTasks());
         }
     }
 
@@ -393,6 +395,14 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
         this.labels = labels;
     }
 
+    public Set<String> getPendingTasks() {
+        return pendingTasks;
+    }
+
+    public void setPendingTasks(Set<String> pendingTasks) {
+        this.pendingTasks = pendingTasks;
+    }
+
     public List<AtlasClassification> getClassifications() { return classifications; }
 
     public void setClassifications(List<AtlasClassification> classifications) { this.classifications = classifications; }
@@ -443,6 +453,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
         setCustomAttributes(null);
         setBusinessAttributes(null);
         setLabels(null);
+        setPendingTasks(null);
     }
 
     private static String nextInternalId() {
@@ -486,6 +497,9 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
         sb.append(", labels=[");
         dumpObjects(labels, sb);
         sb.append("]");
+        sb.append(", pendingTasks=[");
+        dumpObjects(pendingTasks, sb);
+        sb.append("]");
         sb.append('}');
 
         return sb;
diff --git a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java
new file mode 100644
index 0000000..7918f0b
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.tasks;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtlasTask {
+    @JsonIgnore
+    public static final int MAX_ATTEMPT_COUNT = 3;
+
+    public enum Status {
+        PENDING,
+        IN_PROGRESS,
+        COMPLETE,
+        FAILED;
+    }
+
+    private String              type;
+    private String              guid;
+    private String              createdBy;
+    private Date                createdTime;
+    private Date                updatedTime;
+    private Date                startTime;
+    private Date                endTime;
+    private Map<String, Object> parameters;
+    private int                 attemptCount;
+    private String              errorMessage;
+    private Status              status;
+
+    public AtlasTask() {
+    }
+
+    public AtlasTask(String type, String createdBy, Map<String, Object> parameters) {
+        this.guid         = UUID.randomUUID().toString();
+        this.type         = type;
+        this.createdBy    = createdBy;
+        this.createdTime  = new Date();
+        this.updatedTime  = this.createdTime;
+        this.parameters   = parameters;
+        this.status       = Status.PENDING;
+        this.attemptCount = 0;
+    }
+
+    public String getGuid() {
+        return guid;
+    }
+
+    public void setGuid(String guid) {
+        this.guid = guid;
+    }
+
+    public String getCreatedBy() {
+        return createdBy;
+    }
+
+    public void setCreatedBy(String createdBy) {
+        this.createdBy = createdBy;
+    }
+
+    public Date getCreatedTime() {
+        return createdTime;
+    }
+
+    public void setCreatedTime(Date createdTime) {
+        this.createdTime = createdTime;
+    }
+
+    public Date getUpdatedTime() {
+        return updatedTime;
+    }
+
+    public void setUpdatedTime(Date updatedTime) {
+        this.updatedTime = updatedTime;
+    }
+
+    public Map<String, Object> getParameters() {
+        return parameters;
+    }
+
+    public void setParameters(Map<String, Object> val) {
+        this.parameters = val;
+    }
+
+    public void setType(String val) {
+        this.type = val;
+    }
+
+    public String getType() {
+        return this.type;
+    }
+
+    public void setStatus(String val) {
+        if (StringUtils.isNotEmpty(val)) {
+            this.status = Status.valueOf(val);
+        }
+    }
+
+    public void setStatus(Status val) {
+        this.status = val;
+    }
+
+    public Status getStatus() {
+        return this.status;
+    }
+
+    public int getAttemptCount() {
+        return attemptCount;
+    }
+
+    public void setAttemptCount(int attemptCount) {
+        this.attemptCount = attemptCount;
+    }
+
+    public void incrementAttemptCount() {
+        this.attemptCount++;
+    }
+
+    public void setStatusPending() {
+        this.status = Status.PENDING;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    public Date getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(Date startTime) {
+        this.startTime = startTime;
+    }
+
+    public Date getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(Date endTime) {
+        this.endTime = endTime;
+    }
+
+    @JsonIgnore
+    public void start() {
+        this.setStatus(Status.IN_PROGRESS);
+        this.setStartTime(new Date());
+    }
+
+    @JsonIgnore
+    public void end() {
+        this.status = Status.COMPLETE;
+        this.setEndTime(new Date());
+    }
+
+    @JsonIgnore
+    public void updateStatusFromAttemptCount() {
+        setStatus((attemptCount < MAX_ATTEMPT_COUNT) ? AtlasTask.Status.PENDING : AtlasTask.Status.FAILED);
+    }
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 27c7f73..bfd5e98 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -1291,6 +1291,7 @@ public class AtlasEntityType extends AtlasStructType {
                 add(new AtlasAttributeDef(IS_INCOMPLETE_PROPERTY_KEY, ATLAS_TYPE_INT, false, true));
                 add(new AtlasAttributeDef(LABELS_PROPERTY_KEY, ATLAS_TYPE_STRING, false, true));
                 add(new AtlasAttributeDef(CUSTOM_ATTRIBUTES_PROPERTY_KEY, ATLAS_TYPE_STRING, false, true));
+                add(new AtlasAttributeDef(PENDING_TASKS_PROPERTY_KEY, ATLAS_TYPE_STRING, false, true));
             }};
 
             return new AtlasEntityDef(ENTITY_ROOT_NAME, "Root entity for system attributes", "1.0", attributeDefs);
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
index 4c7f8c6..0c05bb3 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
@@ -59,7 +59,7 @@ public class AtlasTypeRegistry {
         registryData              = new RegistryData();
         updateSynchronizer        = new TypeRegistryUpdateSynchronizer(this);
         missingRelationshipDefs   = new HashSet<>();
-        commonIndexFieldNameCache = new HashMap<>();
+        commonIndexFieldNameCache = new LinkedHashMap<>();
 
         resolveReferencesForRootTypes();
         resolveIndexFieldNamesForRootTypes();
diff --git a/intg/src/main/java/org/apache/atlas/type/Constants.java b/intg/src/main/java/org/apache/atlas/type/Constants.java
index 7ee8520..14c9c7c 100644
--- a/intg/src/main/java/org/apache/atlas/type/Constants.java
+++ b/intg/src/main/java/org/apache/atlas/type/Constants.java
@@ -47,6 +47,7 @@ public final class Constants {
     public static final String CLASSIFICATION_NAMES_KEY             = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "classificationNames");
     public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY  = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames");
     public static final String IS_INCOMPLETE_PROPERTY_KEY           = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
+    public static final String PENDING_TASKS_PROPERTY_KEY           = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks");
 
     //Classification-Only System Attributes
     public static final String CLASSIFICATION_ENTITY_STATUS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityStatus");
diff --git a/intg/src/test/resources/atlas-application.properties b/intg/src/test/resources/atlas-application.properties
index 7e74d51..50ce01e 100644
--- a/intg/src/test/resources/atlas-application.properties
+++ b/intg/src/test/resources/atlas-application.properties
@@ -144,3 +144,6 @@ atlas.authentication.method.kerberos=false
 #########  Gremlin Search Configuration  #########
 # Set to false to disable gremlin search.
 atlas.search.gremlin.enable=true
+
+#########  Configure use of Tasks  #########
+atlas.tasks.enabled=false
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index 86b369f..343d00d 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -26,7 +26,9 @@ import org.apache.atlas.exception.NotFoundException;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.tasks.TaskManagement;
 import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
+import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -53,7 +55,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
     private static final ThreadLocal<Boolean>                   innerFailure               = ThreadLocal.withInitial(() -> Boolean.FALSE);
     private static final ThreadLocal<Map<String, AtlasVertex>>  guidVertexCache            = ThreadLocal.withInitial(() -> new HashMap<>());
 
-    private final AtlasGraph graph;
+    private final AtlasGraph     graph;
+    private final TaskManagement taskManagement;
 
     private static final ThreadLocal<Map<Object, String>> vertexGuidCache =
             new ThreadLocal<Map<Object, String>>() {
@@ -80,8 +83,9 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
             };
 
     @Inject
-    public GraphTransactionInterceptor(AtlasGraph graph) {
-        this.graph = graph;
+    public GraphTransactionInterceptor(AtlasGraph graph, TaskManagement taskManagement) {
+        this.graph          = graph;
+        this.taskManagement = taskManagement;
     }
 
     @Override
@@ -89,7 +93,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
         Method        method            = invocation.getMethod();
         String        invokingClass     = method.getDeclaringClass().getSimpleName();
         String        invokedMethodName = method.getName();
-        boolean       logRollback       = method.getAnnotation(GraphTransaction.class).logRollback();
+        boolean       logRollback       = method.getAnnotation(GraphTransaction.class) == null || method.getAnnotation(GraphTransaction.class).logRollback();
 
         boolean isInnerTxn = isTxnOpen.get();
         // Outermost txn marks any subsequent transaction as inner
@@ -164,6 +168,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
             }
 
             OBJECT_UPDATE_SYNCHRONIZER.releaseLockedObjects();
+
+            if (isSuccess) {
+                submitTasks();
+            }
         }
     }
 
@@ -231,6 +239,14 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
         edgeStateCache.get().clear();
     }
 
+    private void submitTasks() {
+        if (CollectionUtils.isEmpty(RequestContext.get().getQueuedTasks()) || taskManagement == null) {
+            return;
+        }
+
+        taskManagement.addAll(RequestContext.get().getQueuedTasks());
+    }
+
     boolean logException(Throwable t) {
         if (t instanceof AtlasBaseException) {
             Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index ca03492..47c8b14 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -334,6 +334,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
             createCommonVertexIndex(management, PROPAGATED_CLASSIFICATION_NAMES_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
             createCommonVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, true);
             createCommonVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, LIST, true, true);
+            createCommonVertexIndex(management, PENDING_TASKS_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, false);
             createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, UniqueKind.NONE, Integer.class, SINGLE, true, true);
             createCommonVertexIndex(management, CUSTOM_ATTRIBUTES_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
             createCommonVertexIndex(management, LABELS_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
@@ -344,6 +345,12 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
             createCommonVertexIndex(management, PATCH_ACTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
             createCommonVertexIndex(management, PATCH_STATE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
 
+            // tasks
+            createCommonVertexIndex(management, TASK_GUID, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
+            createCommonVertexIndex(management, TASK_TYPE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
+            createCommonVertexIndex(management, TASK_CREATED_TIME, UniqueKind.NONE, Long.class, SINGLE, true, false);
+            createCommonVertexIndex(management, TASK_STATUS, UniqueKind.NONE, String.class, SINGLE, true, false);
+
             // create vertex-centric index
             createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
             createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerDelegate.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerDelegate.java
index 1aaea64..0c5ece0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerDelegate.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerDelegate.java
@@ -21,6 +21,7 @@ package org.apache.atlas.repository.store.graph.v1;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.DeleteType;
+import org.apache.atlas.tasks.TaskManagement;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.AtlasRepositoryConfiguration;
 import org.slf4j.Logger;
@@ -38,13 +39,15 @@ public class DeleteHandlerDelegate {
     private final SoftDeleteHandlerV1 softDeleteHandler;
     private final HardDeleteHandlerV1 hardDeleteHandler;
     private final DeleteHandlerV1     defaultHandler;
-    private final AtlasGraph atlasGraph;
+    private final AtlasGraph graph;
+    private final TaskManagement      taskManagement;
 
     @Inject
-    public DeleteHandlerDelegate(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
-        this.atlasGraph = atlasGraph;
-        this.softDeleteHandler = new SoftDeleteHandlerV1(atlasGraph, typeRegistry);
-        this.hardDeleteHandler = new HardDeleteHandlerV1(atlasGraph, typeRegistry);
+    public DeleteHandlerDelegate(AtlasGraph graph, AtlasTypeRegistry typeRegistry, TaskManagement taskManagement) {
+        this.graph = graph;
+        this.taskManagement    = taskManagement;
+        this.softDeleteHandler = new SoftDeleteHandlerV1(graph, typeRegistry, taskManagement);
+        this.hardDeleteHandler = new HardDeleteHandlerV1(graph, typeRegistry, taskManagement);
         this.defaultHandler    = getDefaultConfiguredHandler(typeRegistry);
     }
 
@@ -77,7 +80,8 @@ public class DeleteHandlerDelegate {
 
             LOG.info("Default delete handler set to: {}", handlerFromProperties.getName());
 
-            ret = (DeleteHandlerV1) handlerFromProperties.getConstructor(AtlasGraph.class, AtlasTypeRegistry.class).newInstance(this.atlasGraph, typeRegistry);
+            ret = (DeleteHandlerV1) handlerFromProperties.getConstructor(AtlasGraph.class, AtlasTypeRegistry.class, TaskManagement.class)
+                                    .newInstance(this.graph, typeRegistry, taskManagement);
         } catch (Exception ex) {
             LOG.error("Error instantiating default delete handler. Defaulting to: {}", softDeleteHandler.getClass().getName(), ex);
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 7b2e2d3..20d5e6f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.repository.store.graph.v1;
 
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
@@ -26,6 +27,8 @@ import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.tasks.AtlasTask;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.repository.graph.AtlasEdgeLabel;
@@ -37,6 +40,8 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.DeleteType;
+import org.apache.atlas.repository.store.graph.v2.tasks.ClassificationTask;
+import org.apache.atlas.tasks.TaskManagement;
 import org.apache.atlas.type.*;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
@@ -59,23 +64,31 @@ import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
 import static org.apache.atlas.repository.graph.GraphHelper.*;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_ADD;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
 
 public abstract class DeleteHandlerV1 {
     public static final Logger LOG = LoggerFactory.getLogger(DeleteHandlerV1.class);
 
-    protected final GraphHelper graphHelper;
-    private final AtlasTypeRegistry    typeRegistry;
-    private final EntityGraphRetriever entityRetriever;
-    private final boolean              shouldUpdateInverseReferences;
-    private final boolean              softDelete;
+    private static final boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
 
-    public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete) {
+    protected final GraphHelper          graphHelper;
+    private   final AtlasTypeRegistry    typeRegistry;
+    private   final EntityGraphRetriever entityRetriever;
+    private   final boolean              shouldUpdateInverseReferences;
+    private   final boolean              softDelete;
+    private   final TaskManagement       taskManagement;
+
+
+    public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete, TaskManagement taskManagement) {
         this.typeRegistry                  = typeRegistry;
         this.graphHelper                   = new GraphHelper(graph);
         this.entityRetriever               = new EntityGraphRetriever(graph, typeRegistry);
         this.shouldUpdateInverseReferences = shouldUpdateInverseReference;
         this.softDelete                    = softDelete;
+        this.taskManagement                = taskManagement;
     }
 
     /**
@@ -383,16 +396,24 @@ public abstract class DeleteHandlerV1 {
     }
 
     private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
-        final List<AtlasVertex> classificationVertices   = getPropagationEnabledClassificationVertices(fromVertex);
-        final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, getRelationshipGuid(edge)) : null;
+        final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
+        String                  relationshipGuid       = getRelationshipGuid(edge);
 
-        if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), propagatedEntityVertices.size());
+        if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
+            for (AtlasVertex classificationVertex : classificationVertices) {
+                createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, toVertex, classificationVertex.getIdForDisplay(), relationshipGuid);
             }
+        } else {
+            final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, relationshipGuid) : null;
 
-            for (AtlasVertex classificationVertex : classificationVertices) {
-                addTagPropagation(classificationVertex, propagatedEntityVertices);
+            if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), propagatedEntityVertices.size());
+                }
+
+                for (AtlasVertex classificationVertex : classificationVertices) {
+                    addTagPropagation(classificationVertex, propagatedEntityVertices);
+                }
             }
         }
     }
@@ -561,75 +582,6 @@ public abstract class DeleteHandlerV1 {
         }
     }
 
-    public void removeTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException {
-        if (edge == null) {
-            return;
-        }
-
-        AtlasVertex outVertex = edge.getOutVertex();
-        AtlasVertex inVertex  = edge.getInVertex();
-
-        if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
-            removeTagPropagation(outVertex, inVertex, edge);
-        }
-
-        if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
-            removeTagPropagation(inVertex, outVertex, edge);
-        }
-    }
-
-    private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
-        final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
-        final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, getRelationshipGuid(edge)) : null;
-
-        if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Removing {} propagated tags: for {} from {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size());
-            }
-
-            for (AtlasVertex classificationVertex : classificationVertices) {
-                String            classificationName     = getTypeName(classificationVertex);
-                AtlasVertex       associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
-                List<AtlasVertex> referrals              = entityRetriever.getIncludedImpactedVerticesV2(associatedEntityVertex, getRelationshipGuid(edge));
-
-                for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
-                    if (referrals.contains(impactedEntityVertex)) {
-                        if (LOG.isDebugEnabled()) {
-                            if (org.apache.commons.lang3.StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) {
-                                LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]",
-                                        getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName, getTypeName(associatedEntityVertex));
-                            } else {
-                                LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path",
-                                        getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName);
-                            }
-                        }
-
-                        continue;
-                    }
-
-                    // remove propagated classification edge and classificationName from propagatedTraitNames vertex property
-                    AtlasEdge propagatedEdge = getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex);
-
-                    if (propagatedEdge != null) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]",
-                                    getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
-                        }
-
-                        graphHelper.removeEdge(propagatedEdge);
-
-                        removeFromPropagatedClassificationNames(impactedEntityVertex, classificationName);
-                    } else {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist",
-                                    getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
-                        }
-                    }
-                }
-            }
-        }
-    }
-
     public void deletePropagatedClassification(AtlasVertex entityVertex, String classificationName, String associatedEntityGuid) throws AtlasBaseException {
         AtlasEdge propagatedEdge = getPropagatedClassificationEdge(entityVertex, classificationName, associatedEntityGuid);
 
@@ -1059,7 +1011,11 @@ public abstract class DeleteHandlerV1 {
             boolean     removePropagations   = getRemovePropagations(classificationVertex);
 
             if (isClassificationEdge && removePropagations) {
-                removeTagPropagation(classificationVertex);
+                if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
+                    createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, instanceVertex, classificationVertex.getIdForDisplay(), null);
+                } else {
+                    removeTagPropagation(classificationVertex);
+                }
             }
 
             deleteEdgeReference(edge, CLASSIFICATION, false, false, instanceVertex);
@@ -1089,4 +1045,166 @@ public abstract class DeleteHandlerV1 {
 
         return ret;
     }
+
+    public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
+        PropagateTags oldTagPropagation = getPropagateTags(edge);
+        PropagateTags newTagPropagation = relationship.getPropagateTags();
+
+        if (newTagPropagation != oldTagPropagation) {
+            List<AtlasVertex>                   currentClassificationVertices = getPropagatableClassifications(edge);
+            Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap     = entityRetriever.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
+
+            // Update propagation edge
+            AtlasGraphUtilsV2.setEncodedProperty(edge, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
+
+            List<AtlasVertex>                   updatedClassificationVertices = getPropagatableClassifications(edge);
+            List<AtlasVertex>                   classificationVerticesUnion   = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
+            Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap     = entityRetriever.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
+
+            // compute add/remove propagations list
+            Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap    = new HashMap<>();
+            Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
+
+            if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) {
+                addPropagationsMap.putAll(updatedClassificationsMap);
+
+            } else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
+                removePropagationsMap.putAll(currentClassificationsMap);
+
+            } else {
+                for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
+                    List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
+                    List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
+                    List<AtlasVertex> entitiesAdded              = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities);
+                    List<AtlasVertex> entitiesRemoved            = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
+
+                    if (CollectionUtils.isNotEmpty(entitiesAdded)) {
+                        addPropagationsMap.put(classificationVertex, entitiesAdded);
+                    }
+
+                    if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
+                        removePropagationsMap.put(classificationVertex, entitiesRemoved);
+                    }
+                }
+            }
+
+            for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
+                List<AtlasVertex> entitiesToAddPropagation = addPropagationsMap.get(classificationVertex);
+
+                addTagPropagation(classificationVertex, entitiesToAddPropagation);
+            }
+
+            for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
+                List<AtlasVertex> entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex);
+
+                removeTagPropagation(classificationVertex, entitiesToRemovePropagation);
+            }
+        } else {
+            // update blocked propagated classifications only if there is no change is tag propagation (don't update both)
+            handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
+        }
+    }
+
+    public void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedClassifications) throws AtlasBaseException {
+        if (blockedClassifications != null) {
+            List<AtlasVertex> propagatableClassifications  = getPropagatableClassifications(edge);
+            List<String>      currBlockedClassificationIds = getBlockedClassificationIds(edge);
+            List<AtlasVertex> currBlockedClassifications   = getVerticesForIds(propagatableClassifications, currBlockedClassificationIds);
+            List<AtlasVertex> classificationsToBlock       = new ArrayList<>();
+            List<String>      classificationIdsToBlock     = new ArrayList<>();
+
+            for (AtlasClassification blockedClassification : blockedClassifications) {
+                AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatableClassifications, blockedClassification);
+
+                if (classificationVertex != null) {
+                    classificationsToBlock.add(classificationVertex);
+                    classificationIdsToBlock.add(classificationVertex.getIdForDisplay());
+                }
+            }
+
+            setBlockedClassificationIds(edge, classificationIdsToBlock);
+
+            List<AtlasVertex> propagationChangedClassifications = (List<AtlasVertex>) CollectionUtils.disjunction(classificationsToBlock, currBlockedClassifications);
+
+            for (AtlasVertex classificationVertex : propagationChangedClassifications) {
+                List<AtlasVertex> propagationsToRemove = new ArrayList<>();
+                List<AtlasVertex> propagationsToAdd    = new ArrayList<>();
+
+                entityRetriever.evaluateClassificationPropagation(classificationVertex, propagationsToAdd, propagationsToRemove);
+
+                if (CollectionUtils.isNotEmpty(propagationsToAdd)) {
+                    addTagPropagation(classificationVertex, propagationsToAdd);
+                }
+
+                if (CollectionUtils.isNotEmpty(propagationsToRemove)) {
+                    removeTagPropagation(classificationVertex, propagationsToRemove);
+                }
+            }
+        }
+    }
+
+    private List<AtlasVertex> getVerticesForIds(List<AtlasVertex> vertices, List<String> vertexIds) {
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(vertexIds)) {
+            for (AtlasVertex vertex : vertices) {
+                String vertexId = vertex.getIdForDisplay();
+
+                if (vertexIds.contains(vertexId)) {
+                    ret.add(vertex);
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    // propagated classifications should contain blocked propagated classification
+    private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) {
+        AtlasVertex ret = null;
+
+        for (AtlasVertex vertex : classificationVertices) {
+            String classificationName = getClassificationName(vertex);
+            String entityGuid         = getClassificationEntityGuid(vertex);
+
+            if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
+                ret = vertex;
+                break;
+            }
+        }
+
+        return ret;
+    }
+
+    private void setBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
+        if (edge != null) {
+            if (classificationIds.isEmpty()) {
+                edge.removeProperty(org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
+            } else {
+                edge.setListProperty(org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
+            }
+        }
+    }
+
+    public void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid) {
+        String              currentUser = RequestContext.getCurrentUser();
+        String              entityGuid  = GraphHelper.getGuid(entityVertex);
+        Map<String, Object> taskParams  = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid);
+        AtlasTask           task        = taskManagement.createTask(taskType, currentUser, taskParams);
+
+        AtlasGraphUtilsV2.addEncodedProperty(entityVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid());
+
+        RequestContext.get().queueTask(task);
+    }
+
+    public void createAndQueueTask(String taskType, AtlasEdge relationshipEdge, AtlasRelationship relationship) {
+        String              currentUser        = RequestContext.getCurrentUser();
+        String              relationshipEdgeId = relationshipEdge.getIdForDisplay();
+        Map<String, Object> taskParams         = ClassificationTask.toParameters(relationshipEdgeId, relationship);
+        AtlasTask           task               = taskManagement.createTask(taskType, currentUser, taskParams);
+
+        AtlasGraphUtilsV2.addEncodedProperty(relationshipEdge, PENDING_TASKS_PROPERTY_KEY, task.getGuid());
+
+        RequestContext.get().queueTask(task);
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java
index c241e23..eebb40a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/HardDeleteHandlerV1.java
@@ -24,6 +24,7 @@ import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.tasks.TaskManagement;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.springframework.stereotype.Component;
 
@@ -34,8 +35,8 @@ import javax.inject.Inject;
 public class HardDeleteHandlerV1 extends DeleteHandlerV1 {
 
     @Inject
-    public HardDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
-        super(graph, typeRegistry, true, false);
+    public HardDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, TaskManagement taskManagement) {
+        super(graph, typeRegistry, true, false, taskManagement);
     }
 
     @Override
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java
index bede9c3..1fc7cb7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java
@@ -26,6 +26,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.tasks.TaskManagement;
 import org.apache.atlas.type.AtlasTypeRegistry;
 
 import javax.inject.Inject;
@@ -38,8 +39,8 @@ import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
 public class SoftDeleteHandlerV1 extends DeleteHandlerV1 {
 
     @Inject
-    public SoftDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
-        super(graph, typeRegistry, false, true);
+    public SoftDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, TaskManagement taskManagement) {
+        super(graph, typeRegistry, false, true, taskManagement);
     }
 
     @Override
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index b2be41c..8d4fdf3 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -192,6 +192,22 @@ public class AtlasGraphUtilsV2 {
         return addProperty(vertex, propertyName, value, true);
     }
 
+    public static AtlasEdge addEncodedProperty(AtlasEdge edge, String propertyName, String value) {
+        List<String> listPropertyValues = edge.getListProperty(propertyName);
+
+        if (listPropertyValues == null) {
+            listPropertyValues = new ArrayList<>();
+        }
+
+        listPropertyValues.add(value);
+
+        edge.removeProperty(propertyName);
+
+        edge.setListProperty(propertyName, listPropertyValues);
+
+        return edge;
+    }
+
     public static AtlasVertex addProperty(AtlasVertex vertex, String propertyName, Object value, boolean isEncoded) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> addProperty({}, {}, {})", toString(vertex), propertyName, value);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
index 8d74489..a0fd71f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -18,6 +18,7 @@
  */
 package org.apache.atlas.repository.store.graph.v2;
 
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.annotation.GraphTransaction;
@@ -26,8 +27,6 @@ import org.apache.atlas.authorize.AtlasPrivilege;
 import org.apache.atlas.authorize.AtlasRelationshipAccessRequest;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
-import org.apache.atlas.model.instance.AtlasClassification;
-import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
@@ -51,7 +50,6 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.utils.AtlasPerfMetrics;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -61,7 +59,6 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -69,10 +66,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.function.Function;
 
 import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED;
-import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
 import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH;
 import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
@@ -84,13 +79,9 @@ import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
 import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
 import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
-import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
-import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
-import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;
 
 @Component
 public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
@@ -98,7 +89,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
     private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
 
     private final AtlasGraph graph;
-    private boolean notificationsEnabled = NOTIFICATION_RELATIONSHIPS_ENABLED.getBoolean();
+    private boolean notificationsEnabled    = NOTIFICATION_RELATIONSHIPS_ENABLED.getBoolean();
+    private boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
 
     private final AtlasTypeRegistry         typeRegistry;
     private final EntityGraphRetriever      entityRetriever;
@@ -401,7 +393,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
                 AtlasGraphUtilsV2.setEncodedProperty(ret, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
 
                 // blocked propagated classifications
-                handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
+                deleteDelegate.getHandler().handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
 
                 // propagate tags
                 deleteDelegate.getHandler().addTagPropagation(ret, tagPropagation);
@@ -459,140 +451,11 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
         return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
     }
 
-    private void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedClassifications) throws AtlasBaseException {
-        if (blockedClassifications != null) {
-            List<AtlasVertex> propagatableClassifications  = getPropagatableClassifications(edge);
-            List<String>      currBlockedClassificationIds = getBlockedClassificationIds(edge);
-            List<AtlasVertex> currBlockedClassifications   = getVerticesForIds(propagatableClassifications, currBlockedClassificationIds);
-            List<AtlasVertex> classificationsToBlock       = new ArrayList<>();
-            List<String>      classificationIdsToBlock     = new ArrayList<>();
-
-            for (AtlasClassification blockedClassification : blockedClassifications) {
-                AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatableClassifications, blockedClassification);
-
-                if (classificationVertex != null) {
-                    classificationsToBlock.add(classificationVertex);
-                    classificationIdsToBlock.add(classificationVertex.getIdForDisplay());
-                }
-            }
-
-            setBlockedClassificationIds(edge, classificationIdsToBlock);
-
-            List<AtlasVertex> propagationChangedClassifications = (List<AtlasVertex>) CollectionUtils.disjunction(classificationsToBlock, currBlockedClassifications);
-
-            for (AtlasVertex classificationVertex : propagationChangedClassifications) {
-                List<AtlasVertex> propagationsToRemove = new ArrayList<>();
-                List<AtlasVertex> propagationsToAdd    = new ArrayList<>();
-
-                entityRetriever.evaluateClassificationPropagation(classificationVertex, propagationsToAdd, propagationsToRemove);
-
-                if (CollectionUtils.isNotEmpty(propagationsToAdd)) {
-                    deleteDelegate.getHandler().addTagPropagation(classificationVertex, propagationsToAdd);
-                }
-
-                if (CollectionUtils.isNotEmpty(propagationsToRemove)) {
-                    deleteDelegate.getHandler().removeTagPropagation(classificationVertex, propagationsToRemove);
-                }
-            }
-        }
-    }
-
-    private List<AtlasVertex> getVerticesForIds(List<AtlasVertex> vertices, List<String> vertexIds) {
-        List<AtlasVertex> ret = new ArrayList<>();
-
-        if (CollectionUtils.isNotEmpty(vertexIds)) {
-            for (AtlasVertex vertex : vertices) {
-                String vertexId = vertex.getIdForDisplay();
-
-                if (vertexIds.contains(vertexId)) {
-                    ret.add(vertex);
-                }
-            }
-        }
-
-        return ret;
-    }
-
-    // propagated classifications should contain blocked propagated classification
-    private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) {
-        AtlasVertex ret = null;
-
-        for (AtlasVertex vertex : classificationVertices) {
-            String classificationName = getClassificationName(vertex);
-            String entityGuid         = getClassificationEntityGuid(vertex);
-
-            if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
-                ret = vertex;
-                break;
-            }
-        }
-
-        return ret;
-    }
-
-    private void setBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
-        if (edge != null) {
-            if (classificationIds.isEmpty()) {
-                edge.removeProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
-            } else {
-                edge.setListProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
-            }
-        }
-    }
-
-    private void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
-        PropagateTags oldTagPropagation = getPropagateTags(edge);
-        PropagateTags newTagPropagation = relationship.getPropagateTags();
-
-        if (newTagPropagation != oldTagPropagation) {
-            List<AtlasVertex>                   currentClassificationVertices = getPropagatableClassifications(edge);
-            Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap     = entityRetriever.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
-
-            // Update propagation edge
-            AtlasGraphUtilsV2.setEncodedProperty(edge, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
-
-            List<AtlasVertex>                   updatedClassificationVertices = getPropagatableClassifications(edge);
-            List<AtlasVertex>                   classificationVerticesUnion   = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
-            Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap     = entityRetriever.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
-
-            // compute add/remove propagations list
-            Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap    = new HashMap<>();
-            Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
-
-            if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) {
-                addPropagationsMap.putAll(updatedClassificationsMap);
-
-            } else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
-                removePropagationsMap.putAll(currentClassificationsMap);
-
-            } else {
-                for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
-                    List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
-                    List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
-
-                    List<AtlasVertex> entitiesAdded   = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities);
-                    List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
-
-                    if (CollectionUtils.isNotEmpty(entitiesAdded)) {
-                        addPropagationsMap.put(classificationVertex, entitiesAdded);
-                    }
-
-                    if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
-                        removePropagationsMap.put(classificationVertex, entitiesRemoved);
-                    }
-                }
-            }
-
-            for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
-                deleteDelegate.getHandler().addTagPropagation(classificationVertex, addPropagationsMap.get(classificationVertex));
-            }
-
-            for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
-                deleteDelegate.getHandler().removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex));
-            }
+    private void updateTagPropagations(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException {
+        if (DEFERRED_ACTION_ENABLED) {
+            createAndQueueTask(CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE, relationshipEdge, relationship);
         } else {
-            // update blocked propagated classifications only if there is no change is tag propagation (don't update both)
-            handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
+            deleteDelegate.getHandler().updateTagPropagations(relationshipEdge, relationship);
         }
     }
 
@@ -934,4 +797,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
             entityChangeNotifier.notifyRelationshipMutation(ret, relationshipUpdate);
         }
     }
+
+    private void createAndQueueTask(String taskType, AtlasEdge relationshipEdge, AtlasRelationship relationship) {
+        deleteDelegate.getHandler().createAndQueueTask(taskType, relationshipEdge, relationship);
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 2cfcc0b..d8ef32b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -18,10 +18,12 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 
+ import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContext;
+import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TimeBoundary;
 import org.apache.atlas.model.TypeCategory;
@@ -39,8 +41,8 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
-import org.apache.atlas.repository.graph.IFullTextMapper;
 import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graph.IFullTextMapper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -48,12 +50,13 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
-import org.apache.atlas.type.AtlasArrayType;
+ import org.apache.atlas.tasks.TaskManagement;
+ import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBuiltInTypes;
+import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasMapType;
-import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
@@ -73,7 +76,17 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -89,9 +102,9 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PA
 import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
 import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
 import static org.apache.atlas.repository.Constants.*;
-import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
 import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
 import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
+import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
 import static org.apache.atlas.repository.graph.GraphHelper.getDelimitedClassificationNames;
 import static org.apache.atlas.repository.graph.GraphHelper.getLabels;
 import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
@@ -107,6 +120,8 @@ import static org.apache.atlas.repository.graph.GraphHelper.string;
 import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_ADD;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
 
@@ -128,6 +143,7 @@ public class EntityGraphMapper {
 
     private static final boolean ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES = AtlasConfiguration.ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES.getBoolean();
     private static final boolean CLASSIFICATION_PROPAGATION_DEFAULT                  = AtlasConfiguration.CLASSIFICATION_PROPAGATION_DEFAULT.getBoolean();
+    private              boolean DEFERRED_ACTION_ENABLED                             = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
 
     private final GraphHelper               graphHelper;
     private final AtlasGraph                graph;
@@ -137,12 +153,14 @@ public class EntityGraphMapper {
     private final IAtlasEntityChangeNotifier entityChangeNotifier;
     private final AtlasInstanceConverter    instanceConverter;
     private final EntityGraphRetriever      entityRetriever;
-    private final IFullTextMapper fullTextMapperV2;
+    private final IFullTextMapper           fullTextMapperV2;
+    private final TaskManagement taskManagement;
 
     @Inject
     public EntityGraphMapper(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, AtlasGraph graph,
                              AtlasRelationshipStore relationshipStore, IAtlasEntityChangeNotifier entityChangeNotifier,
-                             AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2) {
+                             AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2,
+                             TaskManagement taskManagement) {
         this.graphHelper          = new GraphHelper(graph);
         this.deleteDelegate       = deleteDelegate;
         this.typeRegistry         = typeRegistry;
@@ -152,6 +170,12 @@ public class EntityGraphMapper {
         this.instanceConverter    = instanceConverter;
         this.entityRetriever      = new EntityGraphRetriever(graph, typeRegistry);
         this.fullTextMapperV2     = fullTextMapperV2;
+        this.taskManagement       = taskManagement;
+    }
+
+    @VisibleForTesting
+    public void setTasksUseFlag(boolean value) {
+        DEFERRED_ACTION_ENABLED = value;
     }
 
     public AtlasVertex createVertex(AtlasEntity entity) throws AtlasBaseException {
@@ -1943,6 +1967,12 @@ public class EntityGraphMapper {
                     LOG.debug("created vertex {} for trait {}", string(classificationVertex), classificationName);
                 }
 
+                if (propagateTags && taskManagement != null && DEFERRED_ACTION_ENABLED) {
+                    propagateTags = false;
+
+                    createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, entityVertex, classificationVertex.getIdForDisplay());
+                }
+
                 // add the attributes for the trait instance
                 mapClassification(EntityOperation.CREATE, context, classification, entityType, entityVertex, classificationVertex);
                 updateModificationMetadata(entityVertex);
@@ -2000,6 +2030,58 @@ public class EntityGraphMapper {
         }
     }
 
+    @GraphTransaction
+    public List<String> propagateClassification(String entityGuid, String classificationVertexId, String relationshipGuid) throws AtlasBaseException {
+        try {
+            if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
+                LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);
+
+                return null;
+            }
+
+            AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
+
+            if (entityVertex == null) {
+                LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
+
+                return null;
+            }
+
+            AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
+
+            if (classificationVertex == null) {
+                LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
+
+                return null;
+            }
+
+            List<AtlasVertex> impactedVertices = entityRetriever.getImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId);
+
+            if (CollectionUtils.isEmpty(impactedVertices)) {
+                LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId);
+
+                return null;
+            }
+
+            AtlasClassification classification       = entityRetriever.toAtlasClassification(classificationVertex);
+            List<AtlasVertex>   entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, impactedVertices);
+
+            if (CollectionUtils.isEmpty(entitiesPropagatedTo)) {
+                return null;
+            }
+
+            List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entitiesPropagatedTo);
+
+            entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification));
+
+            return propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList());
+        } catch (Exception e) {
+            LOG.error("propagateClassification(entityGuid={}, classificationVertexId={}): error while propagating classification", entityGuid, classificationVertexId, e);
+
+            throw new AtlasBaseException(e);
+        }
+    }
+
     public void deleteClassification(String entityGuid, String classificationName, String associatedEntityGuid) throws AtlasBaseException {
         if (StringUtils.isEmpty(associatedEntityGuid) || associatedEntityGuid.equals(entityGuid)) {
             deleteClassification(entityGuid, classificationName);
@@ -2058,10 +2140,16 @@ public class EntityGraphMapper {
         final List<AtlasVertex> entityVertices;
 
         if (isPropagationEnabled(classificationVertex)) {
-            entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
+            if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
+                createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, entityVertex, classificationVertex.getIdForDisplay());
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Number of propagations to delete -> {}", entityVertices.size());
+                entityVertices = new ArrayList<>();
+            } else {
+                entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Number of propagations to delete -> {}", entityVertices.size());
+                }
             }
         } else {
             entityVertices = new ArrayList<>();
@@ -2267,6 +2355,27 @@ public class EntityGraphMapper {
             Boolean currentTagPropagation = currentClassification.isPropagate();
             Boolean updatedTagPropagation = classification.isPropagate();
 
+            /* -----------------------------
+               | Current Tag | Updated Tag |
+               | Propagation | Propagation |
+               |-------------|-------------|
+               |   true      |    true     | => no-op
+               |-------------|-------------|
+               |   false     |    false    | => no-op
+               |-------------|-------------|
+               |   false     |    true     | => Add Tag Propagation (send ADD classification notifications)
+               |-------------|-------------|
+               |   true      |    false    | => Remove Tag Propagation (send REMOVE classification notifications)
+               |-------------|-------------| */
+
+            if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
+                String propagationType = updatedTagPropagation ? CLASSIFICATION_PROPAGATION_ADD : CLASSIFICATION_PROPAGATION_DELETE;
+
+                createAndQueueTask(propagationType, entityVertex, classificationVertex.getIdForDisplay());
+
+                updatedTagPropagation = null;
+            }
+
             // compute propagatedEntityVertices once and use it for subsequent iterations and notifications
             if (updatedTagPropagation != null && currentTagPropagation != updatedTagPropagation) {
                 if (updatedTagPropagation) {
@@ -2340,6 +2449,63 @@ public class EntityGraphMapper {
         AtlasPerfTracer.log(perf);
     }
 
+    @GraphTransaction
+    public List<String> updateClassificationsPropagation(String entityGuid, String classificationVertexId, String relationshipGuid) throws AtlasBaseException {
+        try {
+            if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
+                LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);
+                return null;
+            }
+
+            AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
+
+            if (entityVertex == null) {
+                LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
+                return null;
+            }
+
+            AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
+
+            if (classificationVertex == null) {
+                LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
+                return null;
+            }
+
+            List<AtlasVertex> entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertex.getIdForDisplay());
+
+            if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
+                LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no impacted vertices found!", entityGuid, classificationVertexId);
+                return null;
+            }
+
+            List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo);
+
+            if (CollectionUtils.isEmpty(entitiesPropagatedTo)) {
+                LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no propagations added!", entityGuid, classificationVertexId);
+                return null;
+            }
+
+            AtlasClassification updatedClassification = entityRetriever.toAtlasClassification(classificationVertex);
+            List<String>        ret                   = new ArrayList<>();
+
+            for (AtlasVertex vertex : entitiesToPropagateTo) {
+                AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
+
+                ret.add(entity.getGuid());
+
+                if (isActive(entity)) {
+                    vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+
+                    entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(updatedClassification));
+                }
+            }
+
+            return ret;
+        } catch (Exception ex) {
+            throw new AtlasBaseException(ex);
+        }
+    }
+
     private AtlasEdge mapClassification(EntityOperation operation,  final EntityMutationContext context, AtlasClassification classification,
                                         AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
                                         throws AtlasBaseException {
@@ -2387,6 +2553,50 @@ public class EntityGraphMapper {
         }
     }
 
+    @GraphTransaction
+    public List<String> deleteClassificationPropagation(String classificationVertexId) throws AtlasBaseException {
+        try {
+            if (StringUtils.isEmpty(classificationVertexId)) {
+                LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex id is empty", classificationVertexId);
+
+                return null;
+            }
+
+            AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
+
+            if (classificationVertex == null) {
+                LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex not found", classificationVertexId);
+
+                return null;
+            }
+
+            List<AtlasVertex> entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
+
+            if (CollectionUtils.isEmpty(entityVertices)) {
+
+                return null;
+            }
+
+            AtlasClassification classification     = entityRetriever.toAtlasClassification(classificationVertex);
+            List<AtlasEntity>   propagatedEntities = updateClassificationText(classification, entityVertices);
+
+            entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification));
+
+            return propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new AtlasBaseException(e);
+        }
+    }
+
+    @GraphTransaction
+    public void updateTagPropagations(String relationshipEdgeId, AtlasRelationship relationship) throws AtlasBaseException {
+        AtlasEdge relationshipEdge = graph.getEdge(relationshipEdgeId);
+
+        deleteDelegate.getHandler().updateTagPropagations(relationshipEdge, relationship);
+
+        entityChangeNotifier.notifyPropagatedEntities();
+    }
+
     private void validateClassificationExists(List<String> existingClassifications, List<String> suppliedClassifications) throws AtlasBaseException {
         Set<String> existingNames = new HashSet<>(existingClassifications);
         for (String classificationName : suppliedClassifications) {
@@ -2608,4 +2818,8 @@ public class EntityGraphMapper {
 
         attributes.put(bmAttribute.getName(), attrValue);
     }
-}
+
+    private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId) {
+        deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null);
+    }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 8208d11..b790023 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -68,6 +68,7 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -79,6 +80,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -116,6 +118,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
 import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
 
 @Component
 public class EntityGraphRetriever {
@@ -501,7 +504,7 @@ public class EntityGraphRetriever {
     public List<AtlasVertex> getImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude) {
         List<AtlasVertex> ret = new ArrayList<>();
 
-        traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, new HashSet<>(), ret);
+        traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, ret);
 
         return ret;
     }
@@ -509,7 +512,7 @@ public class EntityGraphRetriever {
     public List<AtlasVertex> getIncludedImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude) {
         List<AtlasVertex> ret = new ArrayList<>(Arrays.asList(entityVertex));
 
-        traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, new HashSet<>(), ret);
+        traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, ret);
 
         return ret;
     }
@@ -517,20 +520,37 @@ public class EntityGraphRetriever {
     public List<AtlasVertex> getImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude, String classificationId) {
         List<AtlasVertex> ret = new ArrayList<>();
 
-        traverseImpactedVertices(entityVertex, relationshipGuidToExclude, classificationId, new HashSet<>(), ret);
+        traverseImpactedVertices(entityVertex, relationshipGuidToExclude, classificationId, ret);
 
         return ret;
     }
 
-    private void traverseImpactedVertices(AtlasVertex entityVertex, String relationshipGuidToExclude, String classificationId, Set<String> visitedVertices, List<AtlasVertex> result) {
-        visitedVertices.add(entityVertex.getIdForDisplay());
+    private void traverseImpactedVertices(final AtlasVertex entityVertexStart, final String relationshipGuidToExclude,
+                                          final String classificationId, final List<AtlasVertex> result) {
+        Set<String> visitedVertices = new HashSet<>();
 
-        AtlasEntityType entityType          = typeRegistry.getEntityTypeByName(getTypeName(entityVertex));
-        String[]        tagPropagationEdges = entityType != null ? entityType.getTagPropagationEdgesArray() : null;
+        Queue<AtlasVertex> queue = new ArrayDeque<AtlasVertex>() {{ add(entityVertexStart); }};
 
-        if (tagPropagationEdges != null) {
-            Iterable<AtlasEdge> propagationEdges = entityVertex.getEdges(AtlasEdgeDirection.BOTH, tagPropagationEdges);
+        while (!queue.isEmpty()) {
+            AtlasVertex entityVertex   = queue.poll();
+            String      entityVertexId = entityVertex.getIdForDisplay();
+
+            if (visitedVertices.contains(entityVertexId)) {
+                LOG.info("Already visited: {}", entityVertexId);
+
+                continue;
+            }
+
+            visitedVertices.add(entityVertexId);
 
+            AtlasEntityType entityType          = typeRegistry.getEntityTypeByName(getTypeName(entityVertex));
+            String[]        tagPropagationEdges = entityType != null ? entityType.getTagPropagationEdgesArray() : null;
+
+            if (tagPropagationEdges == null) {
+                continue;
+            }
+
+            Iterable<AtlasEdge> propagationEdges = entityVertex.getEdges(AtlasEdgeDirection.BOTH, tagPropagationEdges);
             for (AtlasEdge propagationEdge : propagationEdges) {
                 if (getEdgeStatus(propagationEdge) != ACTIVE) {
                     continue;
@@ -540,7 +560,7 @@ public class EntityGraphRetriever {
 
                 if (tagPropagation == null || tagPropagation == NONE) {
                     continue;
-                } else if (tagPropagation == TWO_TO_ONE)  {
+                } else if (tagPropagation == TWO_TO_ONE) {
                     if (isOutVertex(entityVertex, propagationEdge)) {
                         continue;
                     }
@@ -558,18 +578,18 @@ public class EntityGraphRetriever {
 
                 if (classificationId != null) {
                     List<String> blockedClassificationIds = getBlockedClassificationIds(propagationEdge);
-
                     if (CollectionUtils.isNotEmpty(blockedClassificationIds) && blockedClassificationIds.contains(classificationId)) {
                         continue;
                     }
                 }
 
-                AtlasVertex adjacentVertex = getOtherVertex(propagationEdge, entityVertex);
+                AtlasVertex adjacentVertex             = getOtherVertex(propagationEdge, entityVertex);
+                String      adjacentVertexIdForDisplay = adjacentVertex.getIdForDisplay();
 
-                if (!visitedVertices.contains(adjacentVertex.getIdForDisplay())) {
+                if (!visitedVertices.contains(adjacentVertexIdForDisplay)) {
                     result.add(adjacentVertex);
 
-                    traverseImpactedVertices(adjacentVertex, relationshipGuidToExclude, classificationId, visitedVertices, result);
+                    queue.add(adjacentVertex);
                 }
             }
         }
@@ -768,25 +788,32 @@ public class EntityGraphRetriever {
             LOG.debug("Mapping system attributes for type {}", entity.getTypeName());
         }
 
-        entity.setGuid(getGuid(entityVertex));
-        entity.setTypeName(getTypeName(entityVertex));
-        entity.setStatus(GraphHelper.getStatus(entityVertex));
-        entity.setVersion(GraphHelper.getVersion(entityVertex));
+        try {
+            if (entityVertex != null) {
+                entity.setGuid(getGuid(entityVertex));
+                entity.setTypeName(getTypeName(entityVertex));
+                entity.setStatus(GraphHelper.getStatus(entityVertex));
+                entity.setVersion(GraphHelper.getVersion(entityVertex));
 
-        entity.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex));
-        entity.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex));
+                entity.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex));
+                entity.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex));
 
-        entity.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex)));
-        entity.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex)));
+                entity.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex)));
+                entity.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex)));
 
-        entity.setHomeId(GraphHelper.getHomeId(entityVertex));
+                entity.setHomeId(GraphHelper.getHomeId(entityVertex));
 
-        entity.setIsProxy(GraphHelper.isProxy(entityVertex));
-        entity.setIsIncomplete(isEntityIncomplete(entityVertex));
+                entity.setIsProxy(GraphHelper.isProxy(entityVertex));
+                entity.setIsIncomplete(isEntityIncomplete(entityVertex));
 
-        entity.setProvenanceType(GraphHelper.getProvenanceType(entityVertex));
-        entity.setCustomAttributes(getCustomAttributes(entityVertex));
-        entity.setLabels(getLabels(entityVertex));
+                entity.setProvenanceType(GraphHelper.getProvenanceType(entityVertex));
+                entity.setCustomAttributes(getCustomAttributes(entityVertex));
+                entity.setLabels(getLabels(entityVertex));
+                entity.setPendingTasks(getPendingTasks(entityVertex));
+            }
+        } catch (Throwable t) {
+            LOG.warn("Got exception while mapping system attributes for type {} : ", entity.getTypeName(), t);
+        }
 
         return entity;
     }
@@ -1660,4 +1687,14 @@ public class EntityGraphRetriever {
             relationship.setAttribute(attribute.getName(), attrValue);
         }
     }
+
+    private Set<String> getPendingTasks(AtlasVertex entityVertex) {
+        Collection<String> ret = entityVertex.getPropertyValues(PENDING_TASKS_PROPERTY_KEY, String.class);
+
+        if (CollectionUtils.isEmpty(ret)) {
+            return null;
+        }
+
+        return new HashSet<>(ret);
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
index fe8699d..d6f23d6 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -123,12 +123,12 @@ public class MigrationImport extends ImportStrategy {
     private AtlasEntityStoreV2 createEntityStore(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
         FullTextMapperV2Nop fullTextMapperV2 = new FullTextMapperV2Nop();
         IAtlasEntityChangeNotifier entityChangeNotifier = new EntityChangeNotifierNop();
-        DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(graph, typeRegistry);
+        DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(graph, typeRegistry, null);
         AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
 
         AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(graph, typeRegistry, formatConverters);
         AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(graph, typeRegistry, deleteDelegate, entityChangeNotifier);
-        EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, graph, relationshipStore, entityChangeNotifier, instanceConverter, fullTextMapperV2);
+        EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, graph, relationshipStore, entityChangeNotifier, instanceConverter, fullTextMapperV2, null);
 
         return new AtlasEntityStoreV2(graph, deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper);
     }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
new file mode 100644
index 0000000..6244b2d
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.tasks.TaskFactory;
+import org.apache.atlas.tasks.TaskManagement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class ClassificationPropagateTaskFactory implements TaskFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(ClassificationPropagateTaskFactory.class);
+
+    public static final String CLASSIFICATION_PROPAGATION_ADD                 = "CLASSIFICATION_PROPAGATION_ADD";
+    public static final String CLASSIFICATION_PROPAGATION_UPDATE              = "CLASSIFICATION_PROPAGATION_UPDATE";
+    public static final String CLASSIFICATION_PROPAGATION_DELETE              = "CLASSIFICATION_PROPAGATION_DELETE";
+    public static final String CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE = "CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE";
+
+    private static final List<String> supportedTypes = new ArrayList<String>() {{
+        add(CLASSIFICATION_PROPAGATION_ADD);
+        add(CLASSIFICATION_PROPAGATION_UPDATE);
+        add(CLASSIFICATION_PROPAGATION_DELETE);
+        add(CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE);
+    }};
+
+    private final AtlasGraph             graph;
+    private final EntityGraphMapper      entityGraphMapper;
+    private final DeleteHandlerDelegate  deleteDelegate;
+    private final AtlasRelationshipStore relationshipStore;
+
+    @Inject
+    public ClassificationPropagateTaskFactory(TaskManagement taskManagement, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+        this.graph             = graph;
+        this.entityGraphMapper = entityGraphMapper;
+        this.deleteDelegate    = deleteDelegate;
+        this.relationshipStore = relationshipStore;
+
+        taskManagement.addFactory(this);
+    }
+
+    public org.apache.atlas.tasks.AbstractTask create(AtlasTask task) {
+        String taskType = task.getType();
+        String taskGuid = task.getGuid();
+
+        switch (taskType) {
+            case CLASSIFICATION_PROPAGATION_ADD:
+                return new ClassificationPropagationTasks.Add(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+
+            case CLASSIFICATION_PROPAGATION_UPDATE:
+                return new ClassificationPropagationTasks.Update(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+
+            case CLASSIFICATION_PROPAGATION_DELETE:
+                return new ClassificationPropagationTasks.Delete(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+
+            case CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE:
+                return new ClassificationPropagationTasks.UpdateRelationship(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+
+            default:
+                LOG.warn("Type: {} - {} not found!. The task will be ignored.", taskType, taskGuid);
+                return null;
+        }
+    }
+
+    @Override
+    public List<String> getSupportedTypes() {
+        return this.supportedTypes;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
new file mode 100644
index 0000000..4fda34a
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.type.AtlasType;
+
+import java.util.Map;
+
+public class ClassificationPropagationTasks {
+    public static class Add extends ClassificationTask {
+        public Add(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+            super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+        }
+
+        @Override
+        protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+            String entityGuid             = (String) parameters.get(PARAM_ENTITY_GUID);
+            String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
+            String relationshipGuid       = (String) parameters.get(PARAM_RELATIONSHIP_GUID);
+
+            entityGraphMapper.propagateClassification(entityGuid, classificationVertexId, relationshipGuid);
+        }
+    }
+
+    public static class Update extends ClassificationTask {
+        public Update(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+            super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+        }
+
+        @Override
+        protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+            String entityGuid             = (String) parameters.get(PARAM_ENTITY_GUID);
+            String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
+            String relationshipGuid       = (String) parameters.get(PARAM_RELATIONSHIP_GUID);
+
+            entityGraphMapper.updateClassificationsPropagation(entityGuid, classificationVertexId, relationshipGuid);
+        }
+    }
+
+    public static class Delete extends ClassificationTask {
+        public Delete(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+            super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+        }
+
+        @Override
+        protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+            String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
+
+            entityGraphMapper.deleteClassificationPropagation(classificationVertexId);
+        }
+    }
+
+    public static class UpdateRelationship extends ClassificationTask {
+        public UpdateRelationship(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+            super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
+        }
+
+        @Override
+        protected void run(Map<String, Object> parameters) throws AtlasBaseException {
+            String            relationshipEdgeId = (String) parameters.get(PARAM_RELATIONSHIP_EDGE_ID);
+            AtlasRelationship relationship       = AtlasType.fromJson((String) parameters.get(PARAM_RELATIONSHIP_OBJECT), AtlasRelationship.class);
+
+            entityGraphMapper.updateTagPropagations(relationshipEdgeId, relationship);
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
new file mode 100644
index 0000000..369db08
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
+import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
+import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;
+import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
+
+public abstract class ClassificationTask extends AbstractTask {
+    private static final Logger LOG = LoggerFactory.getLogger(ClassificationTask.class);
+
+    protected static final String PARAM_ENTITY_GUID              = "entityGuid";
+    protected static final String PARAM_CLASSIFICATION_VERTEX_ID = "classificationVertexId";
+    protected static final String PARAM_RELATIONSHIP_GUID        = "relationshipGuid";
+    protected static final String PARAM_RELATIONSHIP_OBJECT      = "relationshipObject";
+    protected static final String PARAM_RELATIONSHIP_EDGE_ID     = "relationshipEdgeId";
+
+    protected final AtlasGraph             graph;
+    protected final EntityGraphMapper      entityGraphMapper;
+    protected final DeleteHandlerDelegate  deleteDelegate;
+    protected final AtlasRelationshipStore relationshipStore;
+
+    public ClassificationTask(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
+        super(task);
+
+        this.graph             = graph;
+        this.entityGraphMapper = entityGraphMapper;
+        this.deleteDelegate    = deleteDelegate;
+        this.relationshipStore = relationshipStore;
+    }
+
+    @Override
+    public AtlasTask.Status perform() throws Exception {
+        Map<String, Object> params = getTaskDef().getParameters();
+
+        if (MapUtils.isEmpty(params)) {
+            LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid());
+
+            return FAILED;
+        }
+
+        String userName = getTaskDef().getCreatedBy();
+
+        if (StringUtils.isEmpty(userName)) {
+            LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid());
+
+            return FAILED;
+        }
+
+        RequestContext.get().setUser(userName, null);
+
+        try {
+            run(params);
+
+            setStatus(COMPLETE);
+        } catch (Exception e) {
+            LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
+
+            setStatus(FAILED);
+
+            throw e;
+        } finally {
+            graph.commit();
+        }
+
+        return getStatus();
+    }
+
+    public static Map<String, Object> toParameters(String entityGuid, String classificationVertexId, String relationshipGuid) {
+        return new HashMap<String, Object>() {{
+            put(PARAM_ENTITY_GUID, entityGuid);
+            put(PARAM_CLASSIFICATION_VERTEX_ID, classificationVertexId);
+            put(PARAM_RELATIONSHIP_GUID, relationshipGuid);
+        }};
+    }
+
+    public static Map<String, Object> toParameters(String relationshipEdgeId, AtlasRelationship relationship) {
+        return new HashMap<String, Object>() {{
+            put(PARAM_RELATIONSHIP_EDGE_ID, relationshipEdgeId);
+            put(PARAM_RELATIONSHIP_OBJECT, AtlasType.toJson(relationship));
+        }};
+    }
+
+    protected void setStatus(AtlasTask.Status status) {
+        super.setStatus(status);
+
+        // remove pending task guid from entity vertex or relationship edge
+        AtlasElement element;
+
+        if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) {
+            element = graph.getEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID));
+
+        } else {
+            element = AtlasGraphUtilsV2.findByGuid((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID));
+        }
+
+        element.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, getTaskGuid());
+    }
+
+    protected abstract void run(Map<String, Object> parameters) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java b/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java
new file mode 100644
index 0000000..33e1d6a
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/AbstractTask.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.model.tasks.AtlasTask;
+
+import static org.apache.atlas.model.tasks.AtlasTask.Status;
+
+public abstract class AbstractTask {
+    private final AtlasTask task;
+
+    public AbstractTask(AtlasTask task) {
+        this.task = task;
+    }
+
+    public void run() throws Exception {
+        try {
+            perform();
+        } catch (Exception exception) {
+            task.setStatusPending();
+
+            task.setErrorMessage(exception.getMessage());
+
+            task.incrementAttemptCount();
+
+            throw exception;
+        } finally {
+            task.end();
+        }
+    }
+
+    protected void setStatus(Status status) {
+        task.setStatus(status);
+    }
+
+    public Status getStatus() {
+        return this.task.getStatus();
+    }
+
+    public String getTaskGuid() {
+        return task.getGuid();
+    }
+
+    public String getTaskType() {
+        return task.getType();
+    }
+
+    protected AtlasTask getTaskDef() {
+        return this.task;
+    }
+
+    public abstract Status perform() throws Exception;
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java
new file mode 100644
index 0000000..81957d8
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TaskExecutor {
+    private static final Logger     LOG              = LoggerFactory.getLogger(TaskExecutor.class);
+    private static final TaskLogger TASK_LOG         = TaskLogger.getLogger();
+    private static final String     TASK_NAME_FORMAT = "atlas-task-%d-";
+
+    private final TaskRegistry              registry;
+    private final Map<String, TaskFactory>  taskTypeFactoryMap;
+    private final TaskManagement.Statistics statistics;
+    private final ExecutorService           executorService;
+
+    public TaskExecutor(TaskRegistry registry, Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics) {
+        this.registry           = registry;
+        this.taskTypeFactoryMap = taskTypeFactoryMap;
+        this.statistics         = statistics;
+        this.executorService    = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+                                                                    .setDaemon(true)
+                                                                    .setNameFormat(TASK_NAME_FORMAT + Thread.currentThread().getName())
+                                                                    .build());
+    }
+
+    public void addAll(List<AtlasTask> tasks) {
+        for (AtlasTask task : tasks) {
+            if (task == null) {
+                continue;
+            }
+
+            TASK_LOG.log(task);
+
+            this.executorService.submit(new TaskConsumer(task, this.registry, this.taskTypeFactoryMap, this.statistics));
+        }
+    }
+
+    @VisibleForTesting
+    void waitUntilDone() throws InterruptedException {
+        Thread.sleep(5000);
+    }
+
+    static class TaskConsumer implements Runnable {
+        private static final int MAX_ATTEMPT_COUNT = 3;
+
+        private final Map<String, TaskFactory>  taskTypeFactoryMap;
+        private final TaskRegistry              registry;
+        private final TaskManagement.Statistics statistics;
+        private final AtlasTask                 task;
+
+        public TaskConsumer(AtlasTask task, TaskRegistry registry, Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics) {
+            this.task               = task;
+            this.registry           = registry;
+            this.taskTypeFactoryMap = taskTypeFactoryMap;
+            this.statistics         = statistics;
+        }
+
+        @Override
+        public void run() {
+            AtlasVertex taskVertex = null;
+            int         attemptCount;
+
+            try {
+                taskVertex = registry.getVertex(task.getGuid());
+
+                if (task == null || taskVertex == null || task.getStatus() == AtlasTask.Status.COMPLETE) {
+                    TASK_LOG.warn("Task not scheduled as it was not found or status was COMPLETE!", task);
+
+                    return;
+                }
+
+                statistics.increment(1);
+
+                attemptCount = task.getAttemptCount();
+
+                if (attemptCount >= MAX_ATTEMPT_COUNT) {
+                    TASK_LOG.warn("Max retry count for task exceeded! Skipping!", task);
+
+                    return;
+                }
+
+                performTask(taskVertex, task);
+            } catch (InterruptedException exception) {
+                if (task != null) {
+                    registry.updateStatus(taskVertex, task);
+
+                    TASK_LOG.error("{}: {}: Interrupted!", task, exception);
+                } else {
+                    LOG.error("Interrupted!", exception);
+                }
+
+                statistics.error();
+            } catch (Exception exception) {
+                if (task != null) {
+                    task.updateStatusFromAttemptCount();
+
+                    registry.updateStatus(taskVertex, task);
+
+                    TASK_LOG.error("Error executing task. Please perform the operation again!", task, exception);
+                } else {
+                    LOG.error("Error executing. Please perform the operation again!", exception);
+                }
+
+                statistics.error();
+            } finally {
+                if (task != null) {
+                    this.registry.commit();
+
+                    TASK_LOG.log(task);
+                }
+            }
+        }
+
+        private void performTask(AtlasVertex taskVertex, AtlasTask task) throws Exception {
+            TaskFactory  factory      = taskTypeFactoryMap.get(task.getType());
+            if (factory == null) {
+                LOG.error("taskTypeFactoryMap does not contain task of type: {}", task.getType());
+                return;
+            }
+
+            AbstractTask runnableTask = factory.create(task);
+
+            runnableTask.run();
+
+            registry.deleteComplete(taskVertex, task);
+
+            statistics.successPrint();
+        }
+    }
+
+    static class TaskLogger {
+        private static final Logger LOG = LoggerFactory.getLogger("TASKS");
+
+        public static TaskLogger getLogger() {
+            return new TaskLogger();
+        }
+
+        public void info(String message) {
+            LOG.info(message);
+        }
+
+        public void log(AtlasTask task) {
+            LOG.info(AtlasType.toJson(task));
+        }
+
+        public void warn(String message, AtlasTask task) {
+            LOG.warn(message, AtlasType.toJson(task));
+        }
+
+        public void error(String s, AtlasTask task, Exception exception) {
+            LOG.error(s, AtlasType.toJson(task), exception);
+        }
+    }
+}
\ No newline at end of file
diff --git a/webapp/src/main/java/org/apache/atlas/BeanUtil.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java
similarity index 56%
copy from webapp/src/main/java/org/apache/atlas/BeanUtil.java
copy to repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java
index ef5a741..47ce1fc 100644
--- a/webapp/src/main/java/org/apache/atlas/BeanUtil.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java
@@ -15,24 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.atlas.tasks;
+import org.apache.atlas.model.tasks.AtlasTask;
 
-package org.apache.atlas;
+import java.util.List;
 
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.stereotype.Component;
+public interface TaskFactory {
+    /**
+     * Creates a concrete task using the task definition.
+     * @param atlasTask
+     * @return
+     */
+    AbstractTask create(AtlasTask atlasTask);
 
-@Component
-public class BeanUtil implements ApplicationContextAware {
-    private static ApplicationContext context;
-
-    @Override
-    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        context = applicationContext;
-    }
-
-    public static <T> T getBean(Class<T> beanClass) {
-        return context.getBean(beanClass);
-    }
+    List<String> getSupportedTypes();
 }
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
new file mode 100644
index 0000000..264aa8c
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.service.Service;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Component
+@Order(7)
+public class TaskManagement implements Service, ActiveStateChangeHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(TaskManagement.class);
+
+    private final ThreadLocal<TaskExecutor> taskExecutorThreadLocal = new ThreadLocal<>();
+    private final Configuration             configuration;
+    private final TaskRegistry              registry;
+    private final Statistics                statistics;
+    private final Map<String, TaskFactory>  taskTypeFactoryMap;
+
+    @Inject
+    public TaskManagement(Configuration configuration, TaskRegistry taskRegistry) {
+        this.configuration      = configuration;
+        this.registry           = taskRegistry;
+        this.statistics         = new Statistics();
+        this.taskTypeFactoryMap = new HashMap<>();
+    }
+
+    @VisibleForTesting
+    TaskManagement(Configuration configuration, TaskRegistry taskRegistry, TaskFactory taskFactory) {
+        this.configuration      = configuration;
+        this.registry           = taskRegistry;
+        this.statistics         = new Statistics();
+        this.taskTypeFactoryMap = new HashMap<>();
+
+        createTaskTypeFactoryMap(taskTypeFactoryMap, taskFactory);
+    }
+
+    @Override
+    public void start() throws AtlasException {
+        if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) {
+            startInternal();
+        } else {
+            LOG.info("TaskManagement.start(): deferring patches until instance activation");
+        }
+    }
+
+    @Override
+    public void stop() throws AtlasException {
+        LOG.info("TaskManagement: Stopped!");
+    }
+
+    @Override
+    public void instanceIsActive() throws AtlasException {
+        LOG.info("==> TaskManagement.instanceIsActive()");
+
+        startInternal();
+
+        LOG.info("<== TaskManagement.instanceIsActive()");
+    }
+
+    @Override
+    public void instanceIsPassive() throws AtlasException {
+        LOG.info("TaskManagement.instanceIsPassive(): no action needed");
+    }
+
+    @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.TASK_MANAGEMENT.getOrder();
+    }
+
+    public void addFactory(TaskFactory taskFactory) {
+        createTaskTypeFactoryMap(this.taskTypeFactoryMap, taskFactory);
+    }
+
+    public AtlasTask createTask(String taskType, String createdBy, Map<String, Object> parameters) {
+        return this.registry.createVertex(taskType, createdBy, parameters);
+    }
+
+    public List<AtlasTask> getAll() {
+        return this.registry.getAll();
+    }
+
+    public void addAll(List<AtlasTask> tasks) {
+        if (CollectionUtils.isEmpty(tasks)) {
+            return;
+        }
+
+        dispatchTasks(tasks);
+    }
+
+    public AtlasTask getByGuid(String guid) throws AtlasBaseException {
+        try {
+            return this.registry.getById(guid);
+        } catch (Exception exception) {
+            LOG.error("Error: getByGuid: {}", guid);
+
+            throw new AtlasBaseException(exception);
+        }
+    }
+
+    public List<AtlasTask> getByGuids(List<String> guids) throws AtlasBaseException {
+        List<AtlasTask> ret = new ArrayList<>();
+
+        for (String guid : guids) {
+            AtlasTask task = getByGuid(guid);
+
+            if (task != null) {
+                ret.add(task);
+            }
+        }
+
+        return ret;
+    }
+
+    public void deleteByGuid(String guid) throws AtlasBaseException {
+        try {
+            this.registry.deleteByGuid(guid);
+        } catch (Exception exception) {
+            throw new AtlasBaseException(exception);
+        }
+    }
+
+    public void deleteByGuids(List<String> guids) throws AtlasBaseException {
+        if (CollectionUtils.isEmpty(guids)) {
+            return;
+        }
+
+        for (String guid : guids) {
+            this.registry.deleteByGuid(guid);
+        }
+    }
+
+    private void dispatchTasks(List<AtlasTask> tasks) {
+        if (CollectionUtils.isEmpty(tasks)) {
+            return;
+        }
+
+        if (this.taskExecutorThreadLocal.get() == null) {
+            this.taskExecutorThreadLocal.set(new TaskExecutor(registry, taskTypeFactoryMap, statistics));
+        }
+
+        this.taskExecutorThreadLocal.get().addAll(tasks);
+
+        this.statistics.print();
+    }
+
+    private void startInternal() {
+        if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) {
+            return;
+        }
+
+        LOG.info("TaskManagement: Started!");
+    }
+
+    public void queuePendingTasks() {
+        if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) {
+            return;
+        }
+
+        List<AtlasTask> pendingTasks = this.registry.getPendingTasks();
+
+        LOG.info("TaskManagement: Found: {}: Tasks in pending state.", pendingTasks.size());
+
+        addAll(pendingTasks);
+    }
+
+    @VisibleForTesting
+    static Map<String, TaskFactory> createTaskTypeFactoryMap(Map<String, TaskFactory> taskTypeFactoryMap, TaskFactory factory) {
+        List<String> supportedTypes = factory.getSupportedTypes();
+
+        if (CollectionUtils.isEmpty(supportedTypes)) {
+            LOG.warn("{}: Supported types returned empty!", factory.getClass());
+
+            return taskTypeFactoryMap;
+        }
+
+        for (String type : supportedTypes) {
+            taskTypeFactoryMap.put(type, factory);
+        }
+
+        return taskTypeFactoryMap;
+    }
+
+    static class Statistics {
+        private static final TaskExecutor.TaskLogger logger = TaskExecutor.TaskLogger.getLogger();
+        private static final long REPORT_FREQUENCY = 30000L;
+
+        private final AtomicInteger total               = new AtomicInteger(0);
+        private final AtomicInteger countSinceLastCheck = new AtomicInteger(0);
+        private final AtomicInteger totalWithErrors     = new AtomicInteger(0);
+        private final AtomicInteger totalSucceed        = new AtomicInteger(0);
+        private       long          lastCheckTime       = System.currentTimeMillis();
+
+        public void error() {
+            this.countSinceLastCheck.incrementAndGet();
+            this.totalWithErrors.incrementAndGet();
+        }
+
+        public void success() {
+            this.countSinceLastCheck.incrementAndGet();
+            this.totalSucceed.incrementAndGet();
+        }
+
+        public void increment() {
+            increment(1);
+        }
+
+        public void increment(int delta) {
+            this.total.addAndGet(delta);
+            this.countSinceLastCheck.addAndGet(delta);
+        }
+
+        public void print() {
+            long now = System.currentTimeMillis();
+            long diff = now - this.lastCheckTime;
+
+            if (diff < REPORT_FREQUENCY) {
+                return;
+            }
+
+            logger.info(String.format("TaskManagement: Processing stats: total=%d, sinceLastStatsReport=%d completedWithErrors=%d, succeded=%d",
+                                       this.total.get(), this.countSinceLastCheck.getAndSet(0),
+                                       this.totalWithErrors.get(), this.totalSucceed.get()));
+            this.lastCheckTime = now;
+        }
+
+        public void successPrint() {
+            success();
+            print();
+        }
+
+        @VisibleForTesting
+        int getTotal() {
+            return this.total.get();
+        }
+
+        @VisibleForTesting
+        int getTotalSuccess() {
+            return this.totalSucceed.get();
+        }
+
+        @VisibleForTesting
+        int getTotalError() {
+            return this.totalWithErrors.get();
+        }
+    }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
new file mode 100644
index 0000000..fae8a4f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.utils.AtlasJson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.repository.Constants.TASK_GUID;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+
+@Component
+public class TaskRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
+
+    private AtlasGraph graph;
+
+    @Inject
+    public TaskRegistry(AtlasGraph graph) {
+        this.graph = graph;
+    }
+
+    @GraphTransaction
+    public AtlasTask save(AtlasTask task) {
+        AtlasVertex vertex = createVertex(task);
+
+        return toAtlasTask(vertex);
+    }
+
+    @GraphTransaction
+    public List<AtlasTask> getPendingTasks() {
+        List<AtlasTask> ret = new ArrayList<>();
+
+        try {
+            AtlasGraphQuery query = graph.query()
+                                         .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
+                                         .has(Constants.TASK_STATUS, AtlasTask.Status.PENDING)
+                                         .orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
+
+            Iterator<AtlasVertex> results = query.vertices().iterator();
+
+            while (results.hasNext()) {
+                AtlasVertex vertex = results.next();
+
+                ret.add(toAtlasTask(vertex));
+            }
+        } catch (Exception exception) {
+            LOG.error("Error fetching pending tasks!", exception);
+        } finally {
+            graph.commit();
+        }
+
+        return ret;
+    }
+
+    @GraphTransaction
+    public void updateStatus(AtlasVertex taskVertex, AtlasTask task) {
+        if (taskVertex == null) {
+            return;
+        }
+
+        setEncodedProperty(taskVertex, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
+        setEncodedProperty(taskVertex, Constants.TASK_STATUS, task.getStatus().toString());
+        setEncodedProperty(taskVertex, Constants.TASK_UPDATED_TIME, System.currentTimeMillis());
+        setEncodedProperty(taskVertex, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());
+    }
+
+    @GraphTransaction
+    public void deleteByGuid(String guid) throws AtlasBaseException {
+        try {
+            AtlasGraphQuery query = graph.query()
+                                         .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
+                                         .has(TASK_GUID, guid);
+
+            Iterator<AtlasVertex> results = query.vertices().iterator();
+
+            if (results.hasNext()) {
+                graph.removeVertex(results.next());
+            }
+        } catch (Exception exception) {
+            LOG.error("Error: deletingByGuid: {}", guid);
+
+            throw new AtlasBaseException(exception);
+        }
+    }
+
+    @GraphTransaction
+    public void deleteComplete(AtlasVertex taskVertex, AtlasTask task) {
+        updateStatus(taskVertex, task);
+
+        deleteVertex(taskVertex);
+    }
+
+    @GraphTransaction
+    public AtlasTask getById(String guid) {
+        AtlasGraphQuery query = graph.query()
+                                     .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
+                                     .has(TASK_GUID, guid);
+
+        Iterator<AtlasVertex> results = query.vertices().iterator();
+
+        return results.hasNext() ? toAtlasTask(results.next()) : null;
+    }
+
+    @GraphTransaction
+    public AtlasVertex getVertex(String taskGuid) {
+        AtlasGraphQuery query = graph.query().has(Constants.TASK_GUID, taskGuid);
+
+        Iterator<AtlasVertex> results = query.vertices().iterator();
+
+        return results.hasNext() ? results.next() : null;
+    }
+
+    @GraphTransaction
+    public List<AtlasTask> getAll() {
+        List<AtlasTask> ret = new ArrayList<>();
+        AtlasGraphQuery query = graph.query()
+                                     .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
+                                     .orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
+
+        Iterator<AtlasVertex> results = query.vertices().iterator();
+
+        while (results.hasNext()) {
+            ret.add(toAtlasTask(results.next()));
+        }
+
+        return ret;
+    }
+
+    public void commit() {
+        this.graph.commit();
+    }
+
+    public AtlasTask createVertex(String taskType, String createdBy, Map<String, Object> parameters) {
+        AtlasTask ret = new AtlasTask(taskType, createdBy, parameters);
+
+        createVertex(ret);
+
+        return ret;
+    }
+
+    private void deleteVertex(AtlasVertex taskVertex) {
+        if (taskVertex == null) {
+            return;
+        }
+
+        graph.removeVertex(taskVertex);
+    }
+
+    private AtlasTask toAtlasTask(AtlasVertex v) {
+        AtlasTask ret = new AtlasTask();
+
+        ret.setGuid(v.getProperty(Constants.TASK_GUID, String.class));
+        ret.setType(v.getProperty(Constants.TASK_TYPE, String.class));
+        ret.setStatus(v.getProperty(Constants.TASK_STATUS, String.class));
+        ret.setCreatedBy(v.getProperty(Constants.TASK_CREATED_BY, String.class));
+        ret.setCreatedTime(new Date(v.getProperty(Constants.TASK_CREATED_TIME, Long.class)));
+        ret.setUpdatedTime(new Date(v.getProperty(Constants.TASK_UPDATED_TIME, Long.class)));
+
+        Long startTime = v.getProperty(Constants.TASK_START_TIME, Long.class);
+        if (startTime != null) {
+            ret.setStartTime(new Date(startTime));
+        }
+
+        Long endTime = v.getProperty(Constants.TASK_END_TIME, Long.class);
+        if (endTime != null) {
+            ret.setEndTime(new Date(endTime));
+        }
+
+        String parametersJson = v.getProperty(Constants.TASK_PARAMETERS, String.class);
+        ret.setParameters(AtlasType.fromJson(parametersJson, Map.class));
+
+        ret.setAttemptCount(v.getProperty(Constants.TASK_ATTEMPT_COUNT, Integer.class));
+        ret.setErrorMessage(v.getProperty(Constants.TASK_ERROR_MESSAGE, String.class));
+
+        return ret;
+    }
+
+    private AtlasVertex createVertex(AtlasTask task) {
+        AtlasVertex ret = graph.addVertex();
+
+        setEncodedProperty(ret, Constants.TASK_GUID, task.getGuid());
+        setEncodedProperty(ret, Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME);
+        setEncodedProperty(ret, Constants.TASK_STATUS, task.getStatus().toString());
+        setEncodedProperty(ret, Constants.TASK_TYPE, task.getType());
+        setEncodedProperty(ret, Constants.TASK_CREATED_BY, task.getCreatedBy());
+        setEncodedProperty(ret, Constants.TASK_CREATED_TIME, task.getCreatedTime());
+        setEncodedProperty(ret, Constants.TASK_UPDATED_TIME, task.getUpdatedTime());
+
+        if (task.getStartTime() != null) {
+            setEncodedProperty(ret, Constants.TASK_START_TIME, task.getStartTime().getTime());
+        }
+
+        if (task.getEndTime() != null) {
+            setEncodedProperty(ret, Constants.TASK_END_TIME, task.getEndTime().getTime());
+        }
+
+        setEncodedProperty(ret, Constants.TASK_PARAMETERS, AtlasJson.toJson(task.getParameters()));
+        setEncodedProperty(ret, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
+        setEncodedProperty(ret, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());
+
+        return ret;
+    }
+}
\ No newline at end of file
diff --git a/webapp/src/main/java/org/apache/atlas/BeanUtil.java b/repository/src/main/java/org/apache/atlas/util/BeanUtil.java
similarity index 97%
rename from webapp/src/main/java/org/apache/atlas/BeanUtil.java
rename to repository/src/main/java/org/apache/atlas/util/BeanUtil.java
index ef5a741..15704bc 100644
--- a/webapp/src/main/java/org/apache/atlas/BeanUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/BeanUtil.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.atlas;
+package org.apache.atlas.util;
 
 import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java
index 71b0a4a..8dda208 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -64,9 +64,11 @@ import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
 import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
 import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+import org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory;
 import org.apache.atlas.runner.LocalSolrRunner;
 import org.apache.atlas.service.Service;
 import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.tasks.TaskManagement;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.AtlasRepositoryConfiguration;
 import org.apache.atlas.util.SearchTracker;
@@ -183,7 +185,11 @@ public class TestModules {
             // Glossary related bindings
             bind(GlossaryService.class).asEagerSingleton();
 
-            final GraphTransactionInterceptor graphTransactionInterceptor = new GraphTransactionInterceptor(new AtlasGraphProvider().get());
+            // TaskManagement
+            bind(TaskManagement.class).asEagerSingleton();
+            bind(ClassificationPropagateTaskFactory.class).asEagerSingleton();
+
+            final GraphTransactionInterceptor graphTransactionInterceptor = new GraphTransactionInterceptor(new AtlasGraphProvider().get(), null);
             requestInjection(graphTransactionInterceptor);
             bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), graphTransactionInterceptor);
         }
diff --git a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
index 9846d43..c277806 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java
@@ -152,7 +152,7 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
         List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
 
         Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
-        assertEquals(entityHeaders.size(), 1);
+        assertEquals(entityHeaders.size(), 11);
     }
 
 
diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
new file mode 100644
index 0000000..e309a76
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.tagpropagation;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.exception.EntityNotFoundException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.AtlasTestBase;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.impexp.ImportService;
+import org.apache.atlas.repository.impexp.ZipFileResourceTestUtils;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.tasks.TaskManagement;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.lang.StringUtils;
+import org.testng.SkipException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
+import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
+    private static final String IMPORT_FILE = "tag-propagation-data.zip";
+
+    private static final String HDFS_PATH_EMPLOYEES = "a3955120-ac17-426f-a4af-972ec8690e5f";
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    private AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasEntityStore entityStore;
+
+    @Inject
+    private ImportService importService;
+
+    @Inject
+    private EntityGraphMapper entityGraphMapper;
+
+    @Inject
+    private TaskManagement tasksManagement;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        RequestContext.clear();
+
+        super.initialize();
+
+        this.tasksManagement.start();
+        entityGraphMapper.setTasksUseFlag(true);
+
+        loadModelFilesAndImportTestData();
+    }
+    private void loadModelFilesAndImportTestData() {
+        try {
+            loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
+            loadModelFromJson("1000-Hadoop/1020-fs_model.json", typeDefStore, typeRegistry);
+            loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
+
+            loadSampleClassificationDefs();
+
+            runImportWithNoParameters(importService, getZipSource(IMPORT_FILE));
+        } catch (AtlasBaseException | IOException e) {
+            throw new SkipException("Model loading failed!");
+        }
+    }
+
+    private void loadSampleClassificationDefs() throws AtlasBaseException {
+        AtlasClassificationDef tagX = new AtlasClassificationDef("tagX");
+        AtlasClassificationDef tagY = new AtlasClassificationDef("tagY");
+
+        typeDefStore.createTypesDef(new AtlasTypesDef(Collections.emptyList(), Collections.emptyList(),
+                Arrays.asList(tagX, tagY),
+                Collections.emptyList(), Collections.emptyList()));
+    }
+
+    public static InputStream getZipSource(String fileName) throws IOException {
+        return ZipFileResourceTestUtils.getFileInputStream(fileName);
+    }
+
+    @Test
+    public void parameterValidation() throws AtlasBaseException {
+        try {
+            entityGraphMapper.propagateClassification(null, null, null);
+            entityGraphMapper.propagateClassification("unknown", "abcd", "xyz");
+        }
+        catch (AtlasBaseException e) {
+            assertNotNull(e.getCause());
+            assertTrue(e.getCause() instanceof EntityNotFoundException);
+        }
+
+        List<String> ret = entityGraphMapper.propagateClassification(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
+        assertNull(ret);
+
+        ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
+        assertNull(ret);
+
+        ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY);
+        assertNull(ret);
+
+        AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
+        ret = entityGraphMapper.propagateClassification(hdfs_employees.getGuid(), StringUtils.EMPTY, StringUtils.EMPTY);
+        assertNull(ret);
+    }
+
+    @Test
+    public void add() throws AtlasBaseException {
+        final String TAG_NAME_X = "tagX";
+        final String TAG_NAME_Y = "tagY";
+
+        AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
+
+        AtlasClassification tagX = new AtlasClassification(TAG_NAME_X);
+        tagX.setEntityGuid(hdfs_employees.getGuid());
+        tagX.setPropagate(true);
+
+        AtlasClassification tagY = new AtlasClassification(TAG_NAME_Y);
+        tagY.setEntityGuid(hdfs_employees.getGuid());
+        tagY.setPropagate(false);
+
+        entityStore.addClassification(Collections.singletonList(HDFS_PATH_EMPLOYEES), tagX);
+        entityStore.addClassification(Collections.singletonList(HDFS_PATH_EMPLOYEES), tagY);
+
+        AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
+        AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_X);
+
+        assertNotNull(entityVertex);
+        assertNotNull(classificationVertex);
+
+        AtlasEntity entityUpdated = getEntity(HDFS_PATH_EMPLOYEES);
+        assertNotNull(entityUpdated.getPendingTasks());
+        List<String> impactedEntities = entityGraphMapper.propagateClassification(hdfs_employees.getGuid(), classificationVertex.getId().toString(), StringUtils.EMPTY);
+        assertNotNull(impactedEntities);
+    }
+
+    @Test(dependsOnMethods = "add")
+    public void update() throws AtlasBaseException {
+        final String TAG_NAME_Y = "tagY";
+
+        AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
+        AtlasClassification tagY = new AtlasClassification(TAG_NAME_Y);
+        tagY.setEntityGuid(hdfs_employees.getGuid());
+        tagY.setPropagate(true);
+
+        entityStore.updateClassifications(hdfs_employees.getGuid(), Collections.singletonList(tagY));
+
+        AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
+        AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_Y);
+
+        assertNotNull(RequestContext.get().getQueuedTasks());
+        assertTrue(RequestContext.get().getQueuedTasks().size() > 0, "No tasks were queued!");
+
+        assertNotNull(entityVertex);
+        assertNotNull(classificationVertex);
+
+        List<String> impactedEntities = entityGraphMapper.updateClassificationsPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString(), StringUtils.EMPTY);
+        assertNotNull(impactedEntities);
+    }
+
+    @Test(dependsOnMethods = "update")
+    public void delete() throws AtlasBaseException {
+        final String TAG_NAME = "tagX";
+
+        AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
+        entityGraphMapper.propagateClassification(hdfs_employees.getGuid(), StringUtils.EMPTY, StringUtils.EMPTY);
+
+        AtlasClassification tagX = new AtlasClassification(TAG_NAME);
+        tagX.setEntityGuid(hdfs_employees.getGuid());
+        tagX.setPropagate(false);
+
+        AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
+        AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME);
+
+        entityStore.deleteClassification(HDFS_PATH_EMPLOYEES, tagX.getTypeName());
+
+        assertNotNull(entityVertex);
+        assertNotNull(classificationVertex);
+
+        List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(classificationVertex.getId().toString());
+        assertNotNull(impactedEntities);
+    }
+
+    private AtlasEntity getEntity(String entityGuid) throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(entityGuid);
+
+        return entityWithExtInfo.getEntity();
+    }
+}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/tasks/BaseTaskFixture.java b/repository/src/test/java/org/apache/atlas/tasks/BaseTaskFixture.java
new file mode 100644
index 0000000..51e9bf7
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/tasks/BaseTaskFixture.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class BaseTaskFixture {
+    protected static final String SPYING_TASK_ADD            = "add";
+    protected static final String SPYING_TASK_ERROR_THROWING = "errorThrowingTask";
+
+    @Inject
+    protected AtlasGraph graph;
+
+    @Inject
+    protected TaskRegistry taskRegistry;
+
+    static class SpyConcreteTask extends AbstractTask {
+        private boolean taskPerformed;
+
+        public SpyConcreteTask(AtlasTask atlasTask) {
+            super(atlasTask);
+        }
+
+        @Override
+        public AtlasTask.Status perform() {
+            this.taskPerformed = true;
+
+            return AtlasTask.Status.COMPLETE;
+        }
+
+        public boolean taskPerformed() {
+            return this.taskPerformed;
+        }
+    }
+
+    static class SpyErrorThrowingTask extends AbstractTask {
+        private boolean taskPerformed;
+
+        public SpyErrorThrowingTask(AtlasTask atlasTask) {
+            super(atlasTask);
+        }
+
+        @Override
+        public AtlasTask.Status perform() {
+            this.taskPerformed = true;
+
+            throw new NullPointerException("SpyErrorThrowingTask: NullPointerException Encountered!");
+        }
+
+        public boolean taskPerformed() {
+            return this.taskPerformed;
+        }
+    }
+
+    static class SpyingFactory implements TaskFactory {
+        private SpyConcreteTask      addTask;
+        private SpyErrorThrowingTask errorTask;
+
+        @Override
+        public AbstractTask create(AtlasTask atlasTask) {
+            switch (atlasTask.getType()) {
+                case "add":
+                    addTask = new SpyConcreteTask(atlasTask);
+                    return addTask;
+
+                case "errorThrowingTask":
+                    errorTask = new SpyErrorThrowingTask(atlasTask);
+                    return errorTask;
+
+                default:
+                    return null;
+            }
+        }
+
+        @Override
+        public List<String> getSupportedTypes() {
+            return new ArrayList<String>() {{
+                add(SPYING_TASK_ADD);
+                add(SPYING_TASK_ERROR_THROWING);
+            }};
+        }
+
+        public SpyConcreteTask getAddTask() {
+            return this.addTask;
+        }
+
+        public SpyErrorThrowingTask getErrorTask() {
+            return this.errorTask;
+        }
+    }
+
+    protected AtlasTask createTask(TaskManagement taskManagement, String type) {
+        return taskManagement.createTask(type, "testUser", Collections.singletonMap("params", "params"));
+    }
+}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/tasks/TaskExecutorTest.java b/repository/src/test/java/org/apache/atlas/tasks/TaskExecutorTest.java
new file mode 100644
index 0000000..f8d131c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/tasks/TaskExecutorTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.commons.lang3.StringUtils;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TaskExecutorTest extends BaseTaskFixture {
+    @Inject
+    private AtlasGraph graph;
+
+    @Inject
+    private TaskRegistry taskRegistry;
+
+    @Inject
+    private TaskManagement taskManagement;
+
+    @Test
+    public void noTasksExecuted() {
+        TaskManagementTest.SpyingFactory spyingFactory = new TaskManagementTest.SpyingFactory();
+        Map<String, TaskFactory> taskFactoryMap = new HashMap<>();
+        TaskManagement.createTaskTypeFactoryMap(new HashMap<>(), spyingFactory);
+
+        TaskManagement.Statistics statistics = new TaskManagement.Statistics();
+        new TaskExecutor(taskRegistry, taskFactoryMap, statistics);
+
+        Assert.assertEquals(statistics.getTotal(), 0);
+    }
+
+    @Test
+    public void tasksNotPersistedIsNotExecuted() throws InterruptedException {
+        TaskManagementTest.SpyingFactory spyingFactory = new TaskManagementTest.SpyingFactory();
+        Map<String, TaskFactory> taskFactoryMap = new HashMap<>();
+        TaskManagement.createTaskTypeFactoryMap(taskFactoryMap, spyingFactory);
+
+        TaskManagement.Statistics statistics = new TaskManagement.Statistics();
+        TaskExecutor taskExecutor = new TaskExecutor(taskRegistry, taskFactoryMap, statistics);
+
+        taskExecutor.addAll(Collections.singletonList(new AtlasTask(SPYING_TASK_ADD, "test", Collections.emptyMap())));
+
+        taskExecutor.waitUntilDone();
+        Assert.assertEquals(statistics.getTotal(), 0);
+    }
+
+
+    @Test
+    public void persistedIsExecuted() throws AtlasBaseException, InterruptedException {
+        TaskManagementTest.SpyingFactory spyingFactory = new TaskManagementTest.SpyingFactory();
+        Map<String, TaskFactory> taskFactoryMap = new HashMap<>();
+        TaskManagement.createTaskTypeFactoryMap(taskFactoryMap, spyingFactory);
+
+        AtlasTask addTask = taskManagement.createTask("add", "test", Collections.emptyMap());
+        AtlasTask errorThrowingTask = taskManagement.createTask("errorThrowingTask", "test", Collections.emptyMap());
+
+        TaskManagement.Statistics statistics = new TaskManagement.Statistics();
+        List<AtlasTask> tasks = new ArrayList<AtlasTask>() {{
+            add(addTask);
+            add(errorThrowingTask);
+            }};
+        graph.commit();
+
+        TaskExecutor taskExecutor = new TaskExecutor(taskRegistry, taskFactoryMap, statistics);
+        taskExecutor.addAll(tasks);
+
+        taskExecutor.waitUntilDone();
+        Assert.assertEquals(statistics.getTotal(), 2);
+        Assert.assertEquals(statistics.getTotalSuccess(), 1);
+        Assert.assertEquals(statistics.getTotalError(), 1);
+
+        Assert.assertNotNull(spyingFactory.getAddTask());
+        Assert.assertNotNull(spyingFactory.getErrorTask());
+
+        Assert.assertTrue(spyingFactory.getAddTask().taskPerformed());
+        Assert.assertTrue(spyingFactory.getErrorTask().taskPerformed());
+
+        assertTaskUntilFail(errorThrowingTask, taskExecutor);
+    }
+
+    private void assertTaskUntilFail(AtlasTask errorThrowingTask, TaskExecutor taskExecutor) throws AtlasBaseException, InterruptedException {
+        AtlasTask errorTaskFromDB = taskManagement.getByGuid(errorThrowingTask.getGuid());
+        Assert.assertNotNull(errorTaskFromDB);
+        Assert.assertTrue(StringUtils.isNotEmpty(errorTaskFromDB.getErrorMessage()));
+        Assert.assertEquals(errorTaskFromDB.getAttemptCount(), 1);
+        Assert.assertEquals(errorTaskFromDB.getStatus(), AtlasTask.Status.PENDING);
+
+        for (int i = errorTaskFromDB.getAttemptCount(); i <= AtlasTask.MAX_ATTEMPT_COUNT; i++) {
+            taskExecutor.addAll(Collections.singletonList(errorThrowingTask));
+        }
+
+        taskExecutor.waitUntilDone();
+        graph.commit();
+        Assert.assertEquals(errorThrowingTask.getStatus(), AtlasTask.Status.FAILED);
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/tasks/TaskManagementTest.java b/repository/src/test/java/org/apache/atlas/tasks/TaskManagementTest.java
new file mode 100644
index 0000000..6f0b89c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/tasks/TaskManagementTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.testng.Assert;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TaskManagementTest extends BaseTaskFixture {
+
+    private static class NullFactory implements TaskFactory {
+        @Override
+        public AbstractTask create(AtlasTask atlasTask) {
+            return null;
+        }
+
+        @Override
+        public List<String> getSupportedTypes() {
+            return null;
+        }
+    }
+
+    @Test
+    public void factoryReturningNullIsHandled() throws AtlasException {
+        TaskManagement taskManagement = new TaskManagement(null, taskRegistry, new NullFactory());
+        taskManagement.start();
+    }
+
+    @Test
+    public void taskSucceedsTaskVertexRemoved() throws AtlasException, InterruptedException, AtlasBaseException {
+        SpyingFactory spyingFactory = new SpyingFactory();
+        TaskManagement taskManagement = new TaskManagement(null, taskRegistry, spyingFactory);
+        taskManagement.start();
+
+        AtlasTask spyTask = createTask(taskManagement, SPYING_TASK_ADD);
+        AtlasTask spyTaskError = createTask(taskManagement, SPYING_TASK_ERROR_THROWING);
+        graph.commit();
+
+        taskManagement.addAll(Arrays.asList(spyTask, spyTaskError));
+
+        TimeUnit.SECONDS.sleep(5);
+        Assert.assertTrue(spyingFactory.getAddTask().taskPerformed());
+        Assert.assertTrue(spyingFactory.getErrorTask().taskPerformed());
+
+        AtlasTask task = taskManagement.getByGuid(spyTask.getGuid());
+        Assert.assertNull(task);
+    }
+
+    @Test
+    public void severalTaskAdds() throws AtlasException, InterruptedException {
+        int MAX_THREADS = 5;
+
+        TaskManagement taskManagement = new TaskManagement(null, taskRegistry);
+        taskManagement.start();
+
+        Thread[] threads = new Thread[MAX_THREADS];
+        for (int i = 0; i < MAX_THREADS; i++) {
+            threads[i] = new Thread(() -> {
+                try {
+                    AtlasTask spyAdd = taskManagement.createTask(SPYING_TASK_ADD, "test", Collections.emptyMap());
+                    AtlasTask spyErr = taskManagement.createTask(SPYING_TASK_ERROR_THROWING, "test", Collections.emptyMap());
+
+                    taskManagement.addAll(Collections.singletonList(spyAdd));
+                    taskManagement.addAll(Collections.singletonList(spyErr));
+
+                    Thread.sleep(10000);
+                    for (int j = 0; j <= AtlasTask.MAX_ATTEMPT_COUNT; j++) {
+                        taskManagement.addAll(Collections.singletonList(spyErr));
+                    }
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            });
+        }
+
+        for (int i = 0; i < MAX_THREADS; i++) {
+            threads[i].start();
+        }
+
+        for (int i = 0; i < MAX_THREADS; i++) {
+            threads[i].join();
+        }
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/tasks/TaskRegistryTest.java b/repository/src/test/java/org/apache/atlas/tasks/TaskRegistryTest.java
new file mode 100644
index 0000000..7f139dc
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/tasks/TaskRegistryTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.tasks;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.testng.Assert;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.List;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TaskRegistryTest {
+    @Inject
+    AtlasGraph graph;
+
+    @Inject
+    TaskRegistry registry;
+
+    @Test
+    public void basic() throws AtlasException, AtlasBaseException {
+        AtlasTask task = new AtlasTask("abcd", "test", Collections.singletonMap("p1", "p1"));
+        Assert.assertNull(registry.getById(task.getGuid()));
+
+        AtlasTask taskFromVertex = registry.save(task);
+        AtlasVertex taskVertex = registry.getVertex(task.getGuid());
+
+        Assert.assertEquals(taskFromVertex.getGuid(), task.getGuid());
+        Assert.assertEquals(taskFromVertex.getType(), task.getType());
+        Assert.assertEquals(taskFromVertex.getAttemptCount(), task.getAttemptCount());
+        Assert.assertEquals(taskFromVertex.getParameters(), task.getParameters());
+        Assert.assertEquals(taskFromVertex.getCreatedBy(), task.getCreatedBy());
+
+        taskFromVertex.incrementAttemptCount();
+        taskFromVertex.setStatusPending();
+        registry.updateStatus(taskVertex, taskFromVertex);
+        registry.commit();
+
+        taskFromVertex = registry.getById(task.getGuid());
+        Assert.assertNotNull(taskVertex);
+        Assert.assertEquals(taskFromVertex.getStatus(), AtlasTask.Status.PENDING);
+        Assert.assertEquals(taskFromVertex.getAttemptCount(), 1);
+
+        registry.deleteByGuid(taskFromVertex.getGuid());
+
+        try {
+            AtlasTask t = registry.getById(taskFromVertex.getGuid());
+            Assert.assertNull(t);
+        }
+        catch (IllegalStateException e) {
+            Assert.assertTrue(true, "Indicates vertex is deleted!");
+        }
+    }
+
+    @Test
+    public void pendingTasks() throws AtlasBaseException {
+        final int MAX_TASKS = 3;
+        final String TASK_TYPE_FORMAT = "abcd:%d";
+
+        for (int i = 0; i < MAX_TASKS; i++) {
+            AtlasTask task = new AtlasTask(String.format(TASK_TYPE_FORMAT, i), "test", Collections.singletonMap("p1", "p1"));
+            registry.save(task);
+        }
+
+        List<AtlasTask> pendingTasks = registry.getPendingTasks();
+        Assert.assertEquals(pendingTasks.size(), MAX_TASKS);
+
+        for (int i = 0; i < MAX_TASKS; i++) {
+            Assert.assertEquals(pendingTasks.get(i).getType(), String.format(TASK_TYPE_FORMAT, i));
+            registry.deleteByGuid(pendingTasks.get(i).getGuid());
+        }
+
+        graph.commit();
+        pendingTasks = registry.getPendingTasks();
+        Assert.assertEquals(pendingTasks.size(), 0);
+    }
+}
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 37d23c2..e144d36 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.tasks.AtlasTask;
 import org.apache.atlas.utils.AtlasPerfMetrics;
 import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
 import org.apache.commons.lang.StringUtils;
@@ -30,13 +31,13 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.Map;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.HashMap;
+import java.util.Set;
 
 import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
 
@@ -61,6 +62,8 @@ public class RequestContext {
     private final Set<String>                            entitiesToSkipUpdate = new HashSet<>();
     private final Set<String>                            onlyCAUpdateEntities = new HashSet<>();
     private final Set<String>                            onlyBAUpdateEntities = new HashSet<>();
+    private final List<AtlasTask>                        queuedTasks          = new ArrayList<>();
+
 
     private String       user;
     private Set<String>  userGroups;
@@ -122,6 +125,7 @@ public class RequestContext {
         this.entitiesToSkipUpdate.clear();
         this.onlyCAUpdateEntities.clear();
         this.onlyBAUpdateEntities.clear();
+        this.queuedTasks.clear();
 
         if (metrics != null && !metrics.isEmpty()) {
             METRICS.debug(metrics.toString());
@@ -446,6 +450,14 @@ public class RequestContext {
         }
     }
 
+    public void queueTask(AtlasTask task) {
+        queuedTasks.add(task);
+    }
+
+    public List<AtlasTask> getQueuedTasks() {
+        return this.queuedTasks;
+    }
+
     public class EntityGuidPair {
         private final Object entity;
         private final String guid;
diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
index bb7f8fc..ba8f088 100644
--- a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
+++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
@@ -32,8 +32,8 @@ public interface ActiveStateChangeHandler {
         TYPEDEF_STORE_INITIALIZER(2),
         ATLAS_PATCH_SERVICE(3),
         DEFAULT_METADATA_SERVICE(4),
-        NOTIFICATION_HOOK_CONSUMER(5);
-
+        NOTIFICATION_HOOK_CONSUMER(5),
+        TASK_MANAGEMENT(6);
 
         private final int order;
 
diff --git a/test-tools/pom.xml b/test-tools/pom.xml
index 50ee675..245f07f 100644
--- a/test-tools/pom.xml
+++ b/test-tools/pom.xml
@@ -49,10 +49,6 @@
                     <artifactId>gmetric4j</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>jcl-over-slf4j</artifactId>
-                </exclusion>
-                <exclusion>
                     <groupId>com.github.ben-manes.caffeine</groupId>
                     <artifactId>caffeine</artifactId>
                 </exclusion>
diff --git a/test-tools/src/main/resources/log4j.properties b/test-tools/src/main/resources/log4j.properties
index 4db0598..e3e159c 100644
--- a/test-tools/src/main/resources/log4j.properties
+++ b/test-tools/src/main/resources/log4j.properties
@@ -30,3 +30,12 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%t]
 # Set both appenders (file and console) on the root logger.
 #log4j.rootLogger=INFO, file, console
 log4j.rootLogger=ERROR, file
+
+# Tasks log file
+log4j.appender.TASKS=org.apache.log4j.RollingFileAppender
+log4j.appender.TASKS.File=targettasks.log
+log4j.appender.TASKS.Append=true
+log4j.appender.TASKS.layout=org.apache.log4j.PatternLayout
+log4j.appender.TASKS.layout.ConversionPattern=[%t] %d %x %m%n
+log4j.appender.TASKS.layout.maxFileSize=1024MB
+log4j.category.TASKS=INFO,TASKS
diff --git a/test-tools/src/main/resources/solr/core-template/solrconfig.xml b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
index 4d7e444..977ebc4 100644
--- a/test-tools/src/main/resources/solr/core-template/solrconfig.xml
+++ b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
@@ -433,29 +433,19 @@
              will be overridden by parameters in the request
           -->
 		<!--
-          35x_t   __guid
-          f0l_t   __typeName
-          i6d_l   __timestamp
-          jr9_t   __state
-          lc5_t   __classificationsText
-          mx1_t   __classificationNames
-          iyt_l   __modificationTimestamp
-          zk5_t  __customAttributes
-          1151_t  __labels
-          ohx_t  __propagatedClassificationNames
-          3thh_s  Asset.__s_name
-          3z0l_s  Asset.__s_owner
-          3wn9_t  Asset.description
-          3v2d_s  Asset.__s_displayName
-          3xfp_s  Asset.__s_userDescription
-          3mdh_t  Referenceable.qualifiedName
-          c45h_t  hive_serde.name
-          cb9h_t  hive_process.queryText
-          c83p_t  hive_process.userName
+		Steps to generate these keys:
+		- Add your fields to GraphBackedSearchIndexer.
+		- Run AtlasDiscoverServiceTest. Most likely the test will fail as the sequence of the fields
+		    loaded from the models would have changed and hence the names would have changed.
+		- From typeRegistry, use typeRegistry.commonIndexFieldNameCache.values().stream().collect(Collectors.joining(" "))
+		    to collect all the field names. Past them here in the 'qf' tag.
+		- Clean and build.
+		- Re-run the test. It should pass now.
+		Thanks to @pshah for the steps.
          -->
         <lst name="defaults">
             <str name="defType">edismax</str>
-            <str name="qf">35x_t f0l_t i6d_l jr9_t lc5_t mx1_t iyt_l zk5_t 1151_t ohx_t 3thh_s 3z0l_s 3wn9_t 3v2d_s 3xfp_s 3mdh_t c45h_t cb9h_t c83p_t</str>
+            <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_t 16o5_t 1891_t 19tx_t 1bet_t 1czp_t 1fd1_t 1gxx_l 1iit_t 3v2d_t 47ph_s 464l_s 43r9_s 426d_s 45c5_t 4d8l_t 49ad_t 4av9_l 4bnp_t 4hz9_t 4ged_t 4jk5_t 4qo5_t 4w79_t 50xx_t 4ykl_t 4zd1_t 4xs5_t 579h_t 56h1_t 5f5x_l 5gqt_l 5dl1_t 5c05_t 5ibp_t 581x_t 58ud_t 5af9_t 5slh_t 5tdx_l 5pfp_t 5m9x_l 5nut_l 5j45_t 5r0l_t 8ikl_t 8ao5_t 8k5h_t 8g79_t 8jd1_t 8hs5_ [...]
             <str name="hl.fl">*</str>
             <bool name="hl.requireFieldMatch">true</bool>
             <bool name="lowercaseOperators">true</bool>
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index e8fc111..714b400 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -55,9 +55,11 @@ import org.apache.atlas.repository.impexp.ExportService;
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.repository.impexp.MigrationProgressService;
 import org.apache.atlas.repository.impexp.ZipSink;
+import org.apache.atlas.model.tasks.AtlasTask;
 import org.apache.atlas.repository.patches.AtlasPatchManager;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.services.MetricsService;
+import org.apache.atlas.tasks.TaskManagement;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.SearchTracker;
@@ -78,6 +80,7 @@ import org.springframework.security.core.GrantedAuthority;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import javax.servlet.http.HttpServletRequest;
@@ -135,7 +138,7 @@ public class AdminResource {
     private static final String UI_DATE_FORMAT                 = "atlas.ui.date.format";
     private static final String UI_DATE_DEFAULT_FORMAT         = "MM/DD/YYYY hh:mm:ss A";
     private static final String OPERATION_STATUS               = "operationStatus";
-    private static final List TIMEZONE_LIST  = Arrays.asList(TimeZone.getAvailableIDs());
+    private static final List TIMEZONE_LIST                    = Arrays.asList(TimeZone.getAvailableIDs());
 
     @Context
     private HttpServletRequest httpServletRequest;
@@ -155,6 +158,7 @@ public class AdminResource {
     private final  MigrationProgressService migrationProgressService;
     private final  ReentrantLock            importExportOperationLock;
     private final  ExportImportAuditService exportImportAuditService;
+    private final  TaskManagement           taskManagement;
     private final  AtlasServerService       atlasServerService;
     private final  AtlasEntityStore         entityStore;
     private final  AtlasPatchManager        patchManager;
@@ -178,7 +182,8 @@ public class AdminResource {
                          MigrationProgressService migrationProgressService,
                          AtlasServerService serverService,
                          ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore,
-                         AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository) {
+                         AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository,
+                         TaskManagement taskManagement) {
         this.serviceState              = serviceState;
         this.metricsService            = metricsService;
         this.exportService             = exportService;
@@ -193,6 +198,7 @@ public class AdminResource {
         this.patchManager              = patchManager;
         this.auditService              = auditService;
         this.auditRepository           = auditRepository;
+        this.taskManagement            = taskManagement;
 
         if (atlasProperties != null) {
             this.defaultUIVersion = atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V2);
@@ -205,6 +211,11 @@ public class AdminResource {
         }
     }
 
+    @PostConstruct
+    public void init() {
+        taskManagement.queuePendingTasks();
+    }
+
     /**
      * Fetches the thread stack dump for this application.
      *
@@ -737,6 +748,22 @@ public class AdminResource {
         return ret;
     }
 
+    @GET
+    @Path("/tasks")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public List<AtlasTask> getTaskStatus(@QueryParam("guids") List<String> guids) throws AtlasBaseException {
+        return CollectionUtils.isNotEmpty(guids) ? taskManagement.getByGuids(guids) : taskManagement.getAll();
+    }
+
+    @DELETE
+    @Path("/tasks")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public void deleteTask(@QueryParam("guids") List<String> guids) throws AtlasBaseException {
+        if (CollectionUtils.isNotEmpty(guids)) {
+            taskManagement.deleteByGuids(guids);
+        }
+    }
+
     private String getEditableEntityTypes(Configuration config) {
         String ret = DEFAULT_EDITABLE_ENTITY_TYPES;
 
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
index d6658fe..61aa313 100755
--- a/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/EmbeddedServer.java
@@ -20,12 +20,10 @@ package org.apache.atlas.web.service;
 
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.BeanUtil;
-import org.apache.atlas.RequestContext;
+import org.apache.atlas.util.BeanUtil;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.audit.AtlasAuditEntry;
 import org.apache.atlas.repository.audit.AtlasAuditService;
-import org.apache.commons.lang.StringUtils;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -37,8 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.Date;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index 77422b2..ef5350d 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -51,7 +51,7 @@ public class AdminResourceTest {
 
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null);
         Response response = adminResource.getStatus();
         assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
         JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
@@ -62,7 +62,7 @@ public class AdminResourceTest {
     public void testResourceGetsValueFromServiceState() throws IOException {
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null);
         Response response = adminResource.getStatus();
 
         verify(serviceState).getState();
diff --git a/webapp/src/test/resources/atlas-application.properties b/webapp/src/test/resources/atlas-application.properties
index 3847a3d..adeade8 100644
--- a/webapp/src/test/resources/atlas-application.properties
+++ b/webapp/src/test/resources/atlas-application.properties
@@ -127,3 +127,6 @@ atlas.authentication.method.kerberos=false
 #########  Gremlin Search Configuration  #########
 # Set to false to disable gremlin search.
 atlas.search.gremlin.enable=true
+
+#########  Configure use of Tasks  #########
+atlas.tasks.enabled=false
diff --git a/webapp/src/test/resources/template_metadata.csv b/webapp/src/test/resources/template_metadata.csv
index 6b6359c..2f662ba 100644
--- a/webapp/src/test/resources/template_metadata.csv
+++ b/webapp/src/test/resources/template_metadata.csv
@@ -1,2 +1,2 @@
 TypeName,UniqueAttributeValue,BusinessAttributeName,BusinessAttributeValue,UniqueAttributeName[optional]
-hive_table_v2,tableqZPo3C488c,bmWithAllTypes.attr8,"Awesome Attribute 1",qualifiedName
+hive_table_v2,tableY2Q72pP2Do,bmWithAllTypes.attr8,"Awesome Attribute 1",qualifiedName