You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2019/05/31 17:58:40 UTC

[atlas] branch master updated: ATLAS-3251: Implement Patch to populate classification text for legacy data.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 64c7a3b  ATLAS-3251: Implement Patch to populate classification text for legacy data.
64c7a3b is described below

commit 64c7a3befcc90566f59cb805d51df34bdc7edc74
Author: skoritala <sk...@cloudera.com>
AuthorDate: Fri May 31 10:52:42 2019 -0700

    ATLAS-3251: Implement Patch to populate classification text for legacy data.
    
    Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>
---
 .../repository/patches/AtlasPatchHandler.java      |   3 +-
 .../repository/patches/AtlasPatchManager.java      |   8 +-
 .../patches/ClassificationTextPatch.java           |  81 +++++++
 .../patches/ConcurrentPatchProcessor.java          | 233 +++++++++++++++++++++
 .../atlas/repository/patches/PatchContext.java     |   6 +-
 .../repository/patches/UniqueAttributePatch.java   | 200 +++---------------
 .../store/graph/v2/EntityGraphMapper.java          |  16 +-
 7 files changed, 371 insertions(+), 176 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
index 3e2bf53..970d4d7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.repository.patches;
 
+import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
 
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
@@ -64,5 +65,5 @@ public abstract class AtlasPatchHandler {
         return patchId;
     }
 
-    public abstract void apply();
+    public abstract void apply() throws AtlasBaseException;
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
index 629215d..259f246 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.patches.AtlasPatch;
 import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,8 +40,8 @@ public class AtlasPatchManager {
     private final PatchContext context;
 
     @Inject
-    public AtlasPatchManager(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
-        this.context = new PatchContext(atlasGraph, typeRegistry, indexer);
+    public AtlasPatchManager(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer, EntityGraphMapper entityGraphMapper) {
+        this.context = new PatchContext(atlasGraph, typeRegistry, indexer, entityGraphMapper);
     }
 
     public AtlasPatch.AtlasPatches getAllPatches() {
@@ -49,7 +50,8 @@ public class AtlasPatchManager {
 
     public void applyAll() {
         final AtlasPatchHandler handlers[] = {
-                new UniqueAttributePatch(context)
+                new UniqueAttributePatch(context),
+                new ClassificationTextPatch(context)
         };
 
         try {
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
new file mode 100644
index 0000000..eeaf3d9
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+
+public class ClassificationTextPatch extends AtlasPatchHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(ClassificationTextPatch.class);
+
+    private static final String PATCH_ID          = "JAVA_PATCH_0000_002";
+    private static final String PATCH_DESCRIPTION = "Populates Classification Text attribute for entities from classifications applied on them.";
+
+    private final PatchContext context;
+
+    public ClassificationTextPatch(PatchContext context) {
+        super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
+
+        this.context = context;
+    }
+
+    @Override
+    public void apply() throws AtlasBaseException {
+        ConcurrentPatchProcessor patchProcessor = new ClassificationTextPatchProcessor(context);
+
+        patchProcessor.apply();
+
+        setStatus(APPLIED);
+
+        LOG.info("ClassificationTextPatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
+    }
+
+    public static class ClassificationTextPatchProcessor extends ConcurrentPatchProcessor {
+
+        public ClassificationTextPatchProcessor(PatchContext context) {
+            super(context);
+        }
+
+        @Override
+        protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+            processItem(vertexId, vertex, typeName, entityType);
+        }
+
+        @Override
+        protected void prepareForExecution() {
+            //do nothing
+        }
+
+        protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
+            }
+
+            getEntityGraphMapper().updateClassificationText(vertex);
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
+            }
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
new file mode 100644
index 0000000..813a9c1
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class ConcurrentPatchProcessor {
+    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPatchProcessor.class);
+
+    private static final String     NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers";
+    private static final String     BATCH_SIZE_PROPERTY  = "atlas.patch.batchSize";
+    private static final String     ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
+    private static final String     WORKER_NAME_PREFIX   = "patchWorkItem";
+    private static final int        NUM_WORKERS;
+    private static final int        BATCH_SIZE;
+    private final EntityGraphMapper entityGraphMapper;
+
+    public AtlasGraph getGraph() {
+        return graph;
+    }
+
+    public GraphBackedSearchIndexer getIndexer() {
+        return indexer;
+    }
+
+    public AtlasTypeRegistry getTypeRegistry() {
+        return typeRegistry;
+    }
+
+    private final AtlasGraph graph;
+    private final GraphBackedSearchIndexer indexer;
+    private final AtlasTypeRegistry typeRegistry;
+
+    static {
+        int numWorkers = 3;
+        int batchSize  = 300;
+
+        try {
+            numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers());
+            batchSize  = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
+
+            LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
+        } catch (Exception e) {
+            LOG.error("Error retrieving configuration.", e);
+        }
+
+        NUM_WORKERS = numWorkers;
+        BATCH_SIZE  = batchSize;
+    }
+
+    private static int getDefaultNumWorkers() throws AtlasException {
+        return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
+    }
+
+    public ConcurrentPatchProcessor(PatchContext context) {
+        this.graph             = context.getGraph();
+        this.indexer           = context.getIndexer();
+        this.typeRegistry      = context.getTypeRegistry();
+        this.entityGraphMapper = context.getEntityGraphMapper();
+    }
+
+    public EntityGraphMapper getEntityGraphMapper() {
+        return entityGraphMapper;
+    }
+    public void apply() throws AtlasBaseException {
+        prepareForExecution();
+        execute();
+    }
+
+    private void execute() {
+        Iterable<Object> iterable = graph.query().vertexIds();
+        WorkItemManager manager = new WorkItemManager(
+                new ConsumerBuilder(graph, typeRegistry, this), WORKER_NAME_PREFIX,
+                BATCH_SIZE, NUM_WORKERS, false);
+        try {
+            for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
+                Object vertexId = iter.next();
+                submitForProcessing((Long) vertexId, manager);
+            }
+
+            manager.drain();
+        } finally {
+            try {
+                manager.shutdown();
+            } catch (InterruptedException e) {
+                LOG.error("ConcurrentPatchProcessor.execute(): interrupted during WorkItemManager shutdown.", e);
+            }
+        }
+    }
+
+    private void submitForProcessing(Long vertexId, WorkItemManager manager) {
+        manager.checkProduce(vertexId);
+    }
+
+    private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
+        private final AtlasTypeRegistry typeRegistry;
+        private final AtlasGraph graph;
+        private final ConcurrentPatchProcessor patchItemProcessor;
+
+        public ConsumerBuilder(AtlasGraph graph, AtlasTypeRegistry typeRegistry, ConcurrentPatchProcessor patchItemProcessor) {
+            this.graph = graph;
+            this.typeRegistry = typeRegistry;
+            this.patchItemProcessor = patchItemProcessor;
+        }
+
+        @Override
+        public Consumer build(BlockingQueue<Long> queue) {
+            return new Consumer(graph, typeRegistry, queue, patchItemProcessor);
+        }
+    }
+
+    private static class Consumer extends WorkItemConsumer<Long> {
+        private int MAX_COMMIT_RETRY_COUNT = 3;
+        private final AtlasGraph graph;
+        private final AtlasTypeRegistry typeRegistry;
+
+        private final AtomicLong counter;
+        private final ConcurrentPatchProcessor individualItemProcessor;
+
+        public Consumer(AtlasGraph graph, AtlasTypeRegistry typeRegistry, BlockingQueue<Long> queue, ConcurrentPatchProcessor individualItemProcessor) {
+            super(queue);
+
+            this.graph        = graph;
+            this.typeRegistry = typeRegistry;
+            this.counter = new AtomicLong(0);
+            this.individualItemProcessor = individualItemProcessor;
+        }
+
+        @Override
+        protected void doCommit() {
+            if (counter.get() % BATCH_SIZE == 0) {
+                LOG.info("Processed: {}", counter.get());
+
+                attemptCommit();
+            }
+        }
+
+        @Override
+        protected void commitDirty() {
+            attemptCommit();
+
+            LOG.info("Total: Commit: {}", counter.get());
+
+            super.commitDirty();
+        }
+
+        private void attemptCommit() {
+            for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+                try {
+                    graph.commit();
+
+                    break;
+                } catch(Exception ex) {
+                    LOG.error("Commit exception: ", retryCount, ex);
+
+                    try {
+                        Thread.currentThread().sleep(300 * retryCount);
+                    } catch (InterruptedException e) {
+                        LOG.error("Commit exception: Pause: Interrputed!", e);
+                    }
+                }
+            }
+        }
+
+        @Override
+        protected void processItem(Long vertexId) {
+            counter.incrementAndGet();
+            AtlasVertex vertex = graph.getVertex(Long.toString(vertexId));
+
+            if (vertex == null) {
+                LOG.warn("processItem(vertexId={}): AtlasVertex not found!", vertexId);
+
+                return;
+            }
+
+            if (AtlasGraphUtilsV2.isTypeVertex(vertex)) {
+                return;
+            }
+
+            if (AtlasGraphUtilsV2.getState(vertex) != AtlasEntity.Status.ACTIVE) {
+                return;
+            }
+
+            String          typeName   = AtlasGraphUtilsV2.getTypeName(vertex);
+            AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+            if (entityType == null) {
+                return;
+            }
+
+            try {
+                individualItemProcessor.processVertexItem(vertexId, vertex, typeName, entityType);
+            } catch (AtlasBaseException e) {
+                LOG.error("Error processing: {}", vertexId, e);
+            }
+        }
+    }
+
+    protected abstract void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
+    protected abstract void prepareForExecution() throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
index 3508c74..c4b43d0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.patches;
 
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
 import org.apache.atlas.type.AtlasTypeRegistry;
 
 public class PatchContext {
@@ -27,12 +28,14 @@ public class PatchContext {
     private final AtlasTypeRegistry        typeRegistry;
     private final GraphBackedSearchIndexer indexer;
     private final AtlasPatchRegistry       patchRegistry;
+    private final EntityGraphMapper entityGraphMapper;
 
-    public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
+    public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer, EntityGraphMapper entityGraphMapper) {
         this.graph         = graph;
         this.typeRegistry  = typeRegistry;
         this.indexer       = indexer;
         this.patchRegistry = new AtlasPatchRegistry(this.graph);
+        this.entityGraphMapper = entityGraphMapper;
     }
 
     public AtlasGraph getGraph() {
@@ -50,4 +53,5 @@ public class PatchContext {
     public AtlasPatchRegistry getPatchRegistry() {
         return patchRegistry;
     }
+    public EntityGraphMapper getEntityGraphMapper() { return entityGraphMapper;}
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index 9338e74..bd5f525 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -19,16 +19,11 @@ package org.apache.atlas.repository.patches;
 
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.pc.WorkItemBuilder;
-import org.apache.atlas.pc.WorkItemConsumer;
-import org.apache.atlas.pc.WorkItemManager;
 import org.apache.atlas.repository.IndexException;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
 import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
 import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -36,15 +31,11 @@ import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
-import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
@@ -64,8 +55,8 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
     }
 
     @Override
-    public void apply() {
-        UniqueAttributePatchProcessor patchProcessor = new UniqueAttributePatchProcessor(context);
+    public void apply() throws AtlasBaseException {
+        ConcurrentPatchProcessor patchProcessor = new UniqueAttributePatchProcessor(context);
 
         patchProcessor.apply();
 
@@ -74,17 +65,13 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
         LOG.info("UniqueAttributePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
     }
 
-    public static class UniqueAttributePatchProcessor {
+    public static class UniqueAttributePatchProcessor extends ConcurrentPatchProcessor {
         private static final String NUM_WORKERS_PROPERTY = "atlas.patch.unique_attribute_patch.numWorkers";
         private static final String BATCH_SIZE_PROPERTY  = "atlas.patch.unique_attribute_patch.batchSize";
         private static final String ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
         private static final int    NUM_WORKERS;
         private static final int    BATCH_SIZE;
 
-        private final AtlasGraph               graph;
-        private final GraphBackedSearchIndexer indexer;
-        private final AtlasTypeRegistry        typeRegistry;
-
         static {
             int numWorkers = 3;
             int batchSize  = 300;
@@ -103,37 +90,23 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
         }
 
         public UniqueAttributePatchProcessor(PatchContext context) {
-            this.graph        = context.getGraph();
-            this.indexer      = context.getIndexer();
-            this.typeRegistry = context.getTypeRegistry();
+            super(context);
         }
 
-        public void apply() {
-            createIndexForUniqueAttributes();
-            addUniqueAttributeToAllVertices();
+        @Override
+        protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
+            //process the vertex
+            processItem(vertexId, vertex, typeName, entityType);
         }
 
-        private void addUniqueAttributeToAllVertices() {
-            Iterable<Object> iterable = graph.query().vertexIds();
-            WorkItemManager manager = new WorkItemManager(new ConsumerBuilder(graph, typeRegistry), BATCH_SIZE, NUM_WORKERS);
-            try {
-                for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
-                    Object vertexId = iter.next();
-                    submitForProcessing((Long) vertexId, manager);
-                }
-
-                manager.drain();
-            } finally {
-                try {
-                    manager.shutdown();
-                } catch (InterruptedException e) {
-                    LOG.error("UniqueAttributePatchProcessor.apply(): interrupted during WorkItemManager shutdown", e);
-                }
-            }
+        @Override
+        protected void prepareForExecution() {
+            //create the new attribute for all unique attributes.
+            createIndexForUniqueAttributes();
         }
 
         private void createIndexForUniqueAttributes() {
-            for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
+            for (AtlasEntityType entityType : getTypeRegistry().getAllEntityTypes()) {
 
                 String typeName = entityType.getTypeName();
                 Collection<AtlasAttribute> uniqAttributes = entityType.getUniqAttributes().values();
@@ -150,7 +123,7 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
 
         private void createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
             try {
-                AtlasGraphManagement management = graph.getManagementSystem();
+                AtlasGraphManagement management = getGraph().getManagementSystem();
 
                 for (AtlasAttribute attribute : attributes) {
                     String uniquePropertyName = attribute.getVertexUniquePropertyName();
@@ -162,14 +135,14 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
                     AtlasAttributeDef attributeDef   = attribute.getAttributeDef();
                     boolean           isIndexable    = attributeDef.getIsIndexable();
                     String            attribTypeName = attributeDef.getTypeName();
-                    Class             propertyClass  = indexer.getPrimitiveClass(attribTypeName);
-                    AtlasCardinality  cardinality    = indexer.toAtlasCardinality(attributeDef.getCardinality());
+                    Class             propertyClass  = getIndexer().getPrimitiveClass(attribTypeName);
+                    AtlasCardinality  cardinality    = getIndexer().toAtlasCardinality(attributeDef.getCardinality());
 
-                    indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
+                    getIndexer().createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
                 }
 
-                indexer.commit(management);
-                graph.commit();
+                getIndexer().commit(management);
+                getGraph().commit();
 
                 LOG.info("Unique attributes: type: {}: Registered!", typeName);
             } catch (IndexException e) {
@@ -181,134 +154,29 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
             return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
         }
 
-        private void submitForProcessing(Long vertexId, WorkItemManager manager) {
-            manager.checkProduce(vertexId);
-        }
-
-        private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
-            private final AtlasTypeRegistry typeRegistry;
-            private final AtlasGraph graph;
-
-            public ConsumerBuilder(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
-                this.graph = graph;
-                this.typeRegistry = typeRegistry;
-            }
-
-            @Override
-            public Consumer build(BlockingQueue<Long> queue) {
-                return new Consumer(graph, typeRegistry, queue);
-            }
-        }
-
-        private static class Consumer extends WorkItemConsumer<Long> {
-            private int MAX_COMMIT_RETRY_COUNT = 3;
-            private final AtlasGraph graph;
-            private final AtlasTypeRegistry typeRegistry;
-
-            private final AtomicLong counter;
-
-            public Consumer(AtlasGraph graph, AtlasTypeRegistry typeRegistry, BlockingQueue<Long> queue) {
-                super(queue);
-
-                this.graph        = graph;
-                this.typeRegistry = typeRegistry;
-                this.counter = new AtomicLong(0);
-            }
-
-            @Override
-            protected void doCommit() {
-                if (counter.get() % BATCH_SIZE == 0) {
-                    LOG.info("Processed: {}", counter.get());
-
-                    attemptCommit();
-                }
-            }
+        protected void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
+            LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
 
-            @Override
-            protected void commitDirty() {
-                attemptCommit();
+            for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
+                String                       uniquePropertyKey = attribute.getVertexUniquePropertyName();
+                Collection<? extends String> propertyKeys      = vertex.getPropertyKeys();
+                Object                       uniqAttrValue     = null;
 
-                LOG.info("Total: Commit: {}", counter.get());
-
-                super.commitDirty();
-            }
-
-            private void attemptCommit() {
-                for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+                if (propertyKeys == null || !propertyKeys.contains(uniquePropertyKey)) {
                     try {
-                        graph.commit();
+                        String propertyKey = attribute.getVertexPropertyName();
 
-                        break;
-                    } catch(Exception ex) {
-                        LOG.error("Commit exception: ", retryCount, ex);
+                        uniqAttrValue = EntityGraphRetriever.mapVertexToPrimitive(vertex, propertyKey, attribute.getAttributeDef());
 
-                        try {
-                            Thread.currentThread().sleep(300 * retryCount);
-                        } catch (InterruptedException e) {
-                            LOG.error("Commit exception: Pause: Interrputed!", e);
-                        }
+                        AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
+                    } catch(AtlasSchemaViolationException ex) {
+                        LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
+                        vertex.removeProperty(uniquePropertyKey);
                     }
                 }
             }
 
-            @Override
-            protected void processItem(Long vertexId) {
-                AtlasVertex vertex = graph.getVertex(Long.toString(vertexId));
-
-                if (vertex == null) {
-                    LOG.warn("processItem(vertexId={}): AtlasVertex not found!", vertexId);
-
-                    return;
-                }
-
-                if (AtlasGraphUtilsV2.isTypeVertex(vertex)) {
-                    return;
-                }
-
-                if (AtlasGraphUtilsV2.getState(vertex) != AtlasEntity.Status.ACTIVE) {
-                    return;
-                }
-
-                String          typeName   = AtlasGraphUtilsV2.getTypeName(vertex);
-                AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
-                if (entityType == null) {
-                    return;
-                }
-
-                processItem(vertexId, vertex, typeName, entityType);
-            }
-
-            private void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
-                try {
-                    counter.incrementAndGet();
-                    LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
-
-                    for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
-                        String                       uniquePropertyKey = attribute.getVertexUniquePropertyName();
-                        Collection<? extends String> propertyKeys      = vertex.getPropertyKeys();
-                        Object                       uniqAttrValue     = null;
-
-                        if (propertyKeys == null || !propertyKeys.contains(uniquePropertyKey)) {
-                            try {
-                                String propertyKey = attribute.getVertexPropertyName();
-
-                                uniqAttrValue = EntityGraphRetriever.mapVertexToPrimitive(vertex, propertyKey, attribute.getAttributeDef());
-
-                                AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
-                            } catch(AtlasSchemaViolationException ex) {
-                                LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
-                                vertex.removeProperty(uniquePropertyKey);
-                            }
-                        }
-                    }
-
-                    LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
-                } catch (Exception ex) {
-                    LOG.error("processItem(typeName={}, vertexId={}): failed!", typeName, vertexId, ex);
-                } finally {
-                    commit();
-                }
-            }
+            LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
         }
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 66eeded..64f5503 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -1675,16 +1675,22 @@ public class EntityGraphMapper {
         updateModificationMetadata(entityVertex);
 
         for (Map.Entry<AtlasVertex, List<AtlasClassification>> entry : removedClassifications.entrySet()) {
-            AtlasVertex               vertex                     = entry.getKey();
-            String                    guid                       = GraphHelper.getGuid(vertex);
-            List<AtlasClassification> deletedClassificationNames = entry.getValue();
-            AtlasEntity               entity                     = instanceConverter.getAndCacheEntity(guid);
+            AtlasEntity entity = updateClassificationText(entry.getKey());
+
 
-            vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+            List<AtlasClassification> deletedClassificationNames = entry.getValue();
             entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
         }
     }
 
+    public AtlasEntity updateClassificationText(AtlasVertex vertex) throws AtlasBaseException {
+        String                    guid                       = GraphHelper.getGuid(vertex);
+        AtlasEntity               entity                     = instanceConverter.getAndCacheEntity(guid);
+
+        vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+        return entity;
+    }
+
     public void updateClassifications(EntityMutationContext context, String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
         if (CollectionUtils.isEmpty(classifications)) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_CLASSIFICATION_PARAMS, "update", guid);