You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/04/16 09:21:31 UTC

[atlas] branch master updated: ATLAS-3132: performance improvements in UniqueAttributesPatch

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new efc4beb  ATLAS-3132: performance improvements in UniqueAttributesPatch
efc4beb is described below

commit efc4bebc1623c9d00fe4fdf0df424918654a73df
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Sun Apr 14 18:31:28 2019 -0700

    ATLAS-3132: performance improvements in UniqueAttributesPatch
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
---
 .../java/org/apache/atlas/pc/WorkItemConsumer.java |  19 +-
 .../java/org/apache/atlas/pc/WorkItemManager.java  |  26 +-
 .../apache/atlas/kafka/EmbeddedKafkaServer.java    |   2 +-
 .../org/apache/atlas/kafka/KafkaNotification.java  |   2 +-
 .../repository/patches/AtlasJavaPatchHandler.java  | 138 --------
 .../repository/patches/AtlasPatchHandler.java      |  68 ++++
 .../repository/patches/AtlasPatchManager.java      |  72 +++++
 .../repository/patches/AtlasPatchRegistry.java     | 220 +++++++++++++
 .../repository/patches/AtlasPatchService.java      |  54 ++++
 .../atlas/repository/patches/PatchContext.java     |  34 +-
 .../repository/patches/UniqueAttributePatch.java   | 356 +++++++++++++++++++++
 .../patches/UniqueAttributePatchHandler.java       | 164 ----------
 .../bootstrap/AtlasTypeDefStoreInitializer.java    | 134 ++------
 .../store/graph/v2/AtlasGraphUtilsV2.java          | 133 ++------
 .../store/graph/v2/EntityGraphRetriever.java       |   2 +-
 .../atlas/patches/AtlasPatchRegistryTest.java      |  78 +++++
 .../notification/NotificationHookConsumer.java     |   2 +-
 .../apache/atlas/web/resources/AdminResource.java  |  15 +-
 .../atlas/web/resources/AdminResourceTest.java     |   4 +-
 19 files changed, 944 insertions(+), 579 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
index b7eb4d8..8351b7c 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
@@ -30,31 +30,35 @@ import java.util.concurrent.atomic.AtomicLong;
 public abstract class WorkItemConsumer<T> implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class);
 
-    private static final int POLLING_DURATION_SECONDS = 5;
+    private static final int POLLING_DURATION_SECONDS  = 30;
     private static final int DEFAULT_COMMIT_TIME_IN_MS = 15000;
 
     private final BlockingQueue<T> queue;
-    private AtomicBoolean isDirty = new AtomicBoolean(false);
-    private AtomicLong maxCommitTimeInMs = new AtomicLong(0);
-    private CountDownLatch countdownLatch;
-    private BlockingQueue<Object> results;
+    private final AtomicBoolean    isDirty           = new AtomicBoolean(false);
+    private final AtomicLong       maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
+    private CountDownLatch         countdownLatch;
+    private BlockingQueue<Object>  results;
 
     public WorkItemConsumer(BlockingQueue<T> queue) {
-        this.queue = queue;
+        this.queue          = queue;
+        this.countdownLatch = null;
     }
 
     public void run() {
         try {
             while (!Thread.currentThread().isInterrupted()) {
-
                 T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS);
 
                 if (item == null) {
+                    LOG.warn("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing");
+
                     commitDirty();
+
                     return;
                 }
 
                 isDirty.set(true);
+
                 processItem(item);
             }
         } catch (InterruptedException e) {
@@ -67,6 +71,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
 
     public long getMaxCommitTimeInMs() {
         long commitTime = this.maxCommitTimeInMs.get();
+
         return ((commitTime > DEFAULT_COMMIT_TIME_IN_MS) ? commitTime : DEFAULT_COMMIT_TIME_IN_MS);
     }
 
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
index 0e7d3f2..a7ba67c 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
@@ -33,20 +33,20 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
     private final ExecutorService  service;
     private final List<U>          consumers = new ArrayList<>();
     private CountDownLatch         countdownLatch;
-    private BlockingQueue<Object> resultsQueue;
+    private BlockingQueue<Object>  resultsQueue;
 
     public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
         this.numWorkers = numWorkers;
-        workQueue = new LinkedBlockingQueue<>(batchSize * numWorkers);
-        service   = Executors.newFixedThreadPool(numWorkers,
-                                        new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build());
+        this.workQueue  = new LinkedBlockingQueue<>(batchSize * numWorkers);
+        this.service    = Executors.newFixedThreadPool(numWorkers, new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build());
 
         createConsumers(builder, numWorkers, collectResults);
-        execute();
+
+        start();
     }
 
     public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) {
-        this(builder, "workItem", batchSize, numWorkers, false);
+        this(builder, "workItemConsumer", batchSize, numWorkers, false);
     }
 
     public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
@@ -60,6 +60,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
 
         for (int i = 0; i < numWorkers; i++) {
             U c = (U) builder.build(workQueue);
+
             consumers.add(c);
 
             if (collectResults) {
@@ -68,10 +69,12 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
         }
     }
 
-    private void execute() {
+    public void start() {
         this.countdownLatch = new CountDownLatch(numWorkers);
+
         for (U c : consumers) {
             c.setCountDownLatch(countdownLatch);
+
             service.execute(c);
         }
     }
@@ -85,9 +88,14 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
     }
 
     public void checkProduce(T item) {
-        if (countdownLatch.getCount() == 0) {
-            execute();
+        if (countdownLatch.getCount() < numWorkers) {
+            LOG.info("Fewer workers detected: {}", countdownLatch.getCount());
+
+            drain();
+
+            start();
         }
+
         produce(item);
     }
 
diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
index 32b597f..235b7ce 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
@@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit;
 
 
 @Component
-@Order(2)
+@Order(3)
 public class EmbeddedKafkaServer implements Service {
     public static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
 
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 1d0a273..449eb6f 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -48,7 +48,7 @@ import java.util.concurrent.Future;
  * Kafka specific access point to the Atlas notification framework.
  */
 @Component
-@Order(3)
+@Order(4)
 public class KafkaNotification extends AbstractNotification implements Service {
     public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java
deleted file mode 100644
index 9153d49..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.patches;
-
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.collections.MapUtils;
-
-import java.util.Map;
-
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
-import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
-import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser;
-
-public abstract class AtlasJavaPatchHandler {
-    public final AtlasGraph               graph;
-    public final AtlasTypeRegistry        typeRegistry;
-    public final Map<String, PatchStatus> patchesRegistry;
-    public final EntityGraphRetriever     entityRetriever;
-    public final GraphBackedSearchIndexer indexer;
-    public final PatchContext             context;
-    public final String                   patchId;
-    public final String                   patchDescription;
-
-    private PatchStatus patchStatus;
-
-    public static final String JAVA_PATCH_TYPE = "JAVA_PATCH";
-
-    public AtlasJavaPatchHandler(PatchContext context, String patchId, String patchDescription) {
-        this.context          = context;
-        this.graph            = context.getGraph();
-        this.typeRegistry     = context.getTypeRegistry();
-        this.indexer          = context.getIndexer();
-        this.patchesRegistry  = context.getPatchesRegistry();
-        this.patchId          = patchId;
-        this.patchDescription = patchDescription;
-        this.patchStatus      = getPatchStatus(patchesRegistry);
-        this.entityRetriever  = new EntityGraphRetriever(typeRegistry);
-
-        init();
-    }
-
-    private void init() {
-        PatchStatus patchStatus = getPatchStatus();
-
-        if (patchStatus == UNKNOWN) {
-            AtlasVertex patchVertex = graph.addVertex();
-
-            setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
-            setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patchDescription);
-            setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, JAVA_PATCH_TYPE);
-            setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, getPatchStatus().toString());
-            setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
-            setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
-            setEncodedProperty(patchVertex, CREATED_BY_KEY, getCurrentUser());
-            setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
-
-            addToPatchesRegistry(patchId, getPatchStatus());
-        }
-
-        graph.commit();
-    }
-
-    private PatchStatus getPatchStatus(Map<String, PatchStatus> patchesRegistry) {
-        PatchStatus ret = UNKNOWN;
-
-        if (MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId)) {
-            ret = patchesRegistry.get(patchId);
-        }
-
-        return ret;
-    }
-
-    public void updatePatchVertex(PatchStatus patchStatus) {
-        AtlasVertex patchVertex = findByPatchId(patchId);
-
-        if (patchVertex != null) {
-            setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
-            setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
-            setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
-
-            addToPatchesRegistry(getPatchId(), getPatchStatus());
-        }
-
-        graph.commit();
-    }
-
-    public PatchStatus getPatchStatus() {
-        return patchStatus;
-    }
-
-    public void addToPatchesRegistry(String patchId, PatchStatus status) {
-        getPatchesRegistry().put(patchId, status);
-    }
-
-    public void setPatchStatus(PatchStatus patchStatus) {
-        this.patchStatus = patchStatus;
-    }
-
-    public String getPatchId() {
-        return patchId;
-    }
-
-    public Map<String, PatchStatus> getPatchesRegistry() {
-        return patchesRegistry;
-    }
-
-    public abstract void applyPatch();
-}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
new file mode 100644
index 0000000..d8dcfef
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
+
+public abstract class AtlasPatchHandler {
+    public static final String JAVA_PATCH_TYPE = "JAVA_PATCH";
+
+    private final String             patchId;
+    private final String             patchDescription;
+    private final AtlasPatchRegistry patchRegistry;
+    private       PatchStatus        status;
+
+    public AtlasPatchHandler(AtlasPatchRegistry patchRegistry, String patchId, String patchDescription) {
+        this.patchId          = patchId;
+        this.patchDescription = patchDescription;
+        this.patchRegistry    = patchRegistry;
+        this.status           = getStatusFromRegistry();
+
+        register();
+    }
+
+    private void register() {
+        PatchStatus patchStatus = getStatus();
+
+        if (patchStatus == UNKNOWN) {
+            patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, getStatus());
+        }
+    }
+
+    public PatchStatus getStatusFromRegistry() {
+        return patchRegistry.getStatus(patchId);
+    }
+
+    public PatchStatus getStatus() {
+        return status;
+    }
+
+    public void setStatus(PatchStatus status) {
+        this.status = status;
+
+        patchRegistry.updateStatus(patchId, status);
+    }
+
+    public String getPatchId() {
+        return patchId;
+    }
+
+    public abstract void apply();
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
new file mode 100644
index 0000000..629215d
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.model.patches.AtlasPatch;
+import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED;
+
+@Component
+public class AtlasPatchManager {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchManager.class);
+
+    private final PatchContext context;
+
+    @Inject
+    public AtlasPatchManager(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
+        this.context = new PatchContext(atlasGraph, typeRegistry, indexer);
+    }
+
+    public AtlasPatch.AtlasPatches getAllPatches() {
+        return context.getPatchRegistry().getAllPatches();
+    }
+
+    public void applyAll() {
+        final AtlasPatchHandler handlers[] = {
+                new UniqueAttributePatch(context)
+        };
+
+        try {
+            for (AtlasPatchHandler handler : handlers) {
+                PatchStatus patchStatus = handler.getStatusFromRegistry();
+
+                if (patchStatus == APPLIED || patchStatus == SKIPPED) {
+                    LOG.info("Ignoring java handler: {}; status: {}", handler.getPatchId(), patchStatus);
+                } else {
+                    LOG.info("Applying java handler: {}; status: {}", handler.getPatchId(), patchStatus);
+
+                    handler.apply();
+                }
+            }
+        }
+        catch (Exception ex) {
+            LOG.error("Error applying patches.", ex);
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
new file mode 100644
index 0000000..df57959
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.model.patches.AtlasPatch;
+import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
+import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
+import static org.apache.atlas.repository.Constants.*;
+import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
+import static org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer.TYPEDEF_PATCH_TYPE;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIndexSearchPrefix;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser;
+
+public class AtlasPatchRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchRegistry.class);
+
+    private final Map<String, PatchStatus> patchNameStatusMap;
+    private final AtlasGraph               graph;
+
+    public AtlasPatchRegistry(AtlasGraph graph) {
+        this.graph              = graph;
+        this.patchNameStatusMap = getPatchNameStatusForAllRegistered(graph);
+    }
+
+    public boolean isApplicable(String incomingId, String patchFile, int index) {
+        String patchId = getId(incomingId, patchFile, index);
+
+        if (MapUtils.isEmpty(patchNameStatusMap) || !patchNameStatusMap.containsKey(patchId)) {
+            return true;
+        }
+
+        PatchStatus status = patchNameStatusMap.get(patchId);
+
+        if (status == FAILED || status == UNKNOWN) {
+            return true;
+        }
+
+        return false;
+    }
+
+    public PatchStatus getStatus(String id) {
+        return patchNameStatusMap.get(id);
+    }
+
+    public void register(String patchId, String description, String action, PatchStatus patchStatus) {
+        createOrUpdatePatchVertex(graph, patchId, description, action, patchStatus);
+    }
+
+    public void updateStatus(String patchId, PatchStatus patchStatus) {
+        AtlasVertex patchVertex = findByPatchId(patchId);
+
+        if (patchVertex == null) {
+            return;
+        }
+
+        setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
+        setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+        setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
+        setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
+
+        patchNameStatusMap.put(patchId, patchStatus);
+
+        graph.commit();
+    }
+
+    private static String getId(String incomingId, String patchFile, int index) {
+        String patchId = incomingId;
+
+        if (StringUtils.isEmpty(patchId)) {
+            return String.format("%s_%s", patchFile, index);
+        }
+
+        return patchId;
+    }
+
+    public AtlasPatches getAllPatches() {
+        return getAllPatches(graph);
+    }
+
+    private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId,
+                                           String description, String action, PatchStatus patchStatus) {
+        boolean     isPatchRegistered = MapUtils.isNotEmpty(patchNameStatusMap) && patchNameStatusMap.containsKey(patchId);
+        AtlasVertex patchVertex       = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
+
+        setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
+        setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, description);
+        setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE);
+        setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, action);
+        setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
+        setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+        setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
+        setEncodedProperty(patchVertex, CREATED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
+        setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
+
+        graph.commit();
+    }
+
+    private static Map<String, PatchStatus> getPatchNameStatusForAllRegistered(AtlasGraph graph) {
+        Map<String, PatchStatus> ret     = new HashMap<>();
+        AtlasPatches             patches = getAllPatches(graph);
+
+        for (AtlasPatch patch : patches.getPatches()) {
+            String      patchId     = patch.getId();
+            PatchStatus patchStatus = patch.getStatus();
+
+            if (patchId != null && patchStatus != null) {
+                ret.put(patchId, patchStatus);
+            }
+        }
+
+        return ret;
+    }
+
+    private static AtlasPatches getAllPatches(AtlasGraph graph) {
+        List<AtlasPatch> ret            = new ArrayList<>();
+        String           idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)";
+        AtlasIndexQuery  idxQuery       = graph.indexQuery(VERTEX_INDEX, idxQueryString);
+
+        try {
+            Iterator<Result<Object, Object>> results = idxQuery.vertices();
+
+            while (results != null && results.hasNext()) {
+                AtlasVertex patchVertex = results.next().getVertex();
+                AtlasPatch patch = toAtlasPatch(patchVertex);
+
+                ret.add(patch);
+            }
+
+            if (CollectionUtils.isNotEmpty(ret)) {
+                Collections.sort(ret, Comparator.comparing(AtlasPatch::getId));
+            }
+        } catch (Throwable t) {
+            LOG.warn("getAllPatches(): Returned empty result!");
+        }
+
+        graph.commit();
+
+        return new AtlasPatches(ret);
+    }
+
+
+    private static AtlasPatch toAtlasPatch(AtlasVertex vertex) {
+        AtlasPatch ret = new AtlasPatch();
+
+        ret.setId(getEncodedProperty(vertex, PATCH_ID_PROPERTY_KEY, String.class));
+        ret.setDescription(getEncodedProperty(vertex, PATCH_DESCRIPTION_PROPERTY_KEY, String.class));
+        ret.setType(getEncodedProperty(vertex, PATCH_TYPE_PROPERTY_KEY, String.class));
+        ret.setAction(getEncodedProperty(vertex, PATCH_ACTION_PROPERTY_KEY, String.class));
+        ret.setCreatedBy(getEncodedProperty(vertex, CREATED_BY_KEY, String.class));
+        ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class));
+        ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class));
+        ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class));
+        ret.setStatus(getPatchStatus(vertex));
+
+        return ret;
+    }
+
+    private static AtlasVertex findByPatchId(String patchId) {
+        AtlasVertex                      ret        = null;
+        String                           indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (" + patchId + ")";
+        Iterator<Result<Object, Object>> results    = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices();
+
+        while (results != null && results.hasNext()) {
+            ret = results.next().getVertex();
+
+            if (ret != null) {
+                break;
+            }
+        }
+
+        return ret;
+    }
+
+    private static PatchStatus getPatchStatus(AtlasVertex vertex) {
+        String patchStatus = AtlasGraphUtilsV2.getEncodedProperty(vertex, PATCH_STATE_PROPERTY_KEY, String.class);
+
+        return patchStatus != null ? PatchStatus.valueOf(patchStatus) : UNKNOWN;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchService.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchService.java
new file mode 100644
index 0000000..fc21285
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchService.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.service.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+
+@Component
+@Order(2)
+public class AtlasPatchService implements Service {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasPatchService.class);
+
+    private final AtlasPatchManager patchManager;
+
+
+    @Inject
+    public AtlasPatchService(AtlasPatchManager patchManager) {
+        this.patchManager = patchManager;
+    }
+
+    @Override
+    public void start() throws AtlasException {
+        LOG.info("PatchService: Started.");
+
+        patchManager.applyAll();
+    }
+
+    @Override
+    public void stop() throws AtlasException {
+        LOG.info("PatchService: Stopped.");
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
index a60422b..3508c74 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java
@@ -6,39 +6,33 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.atlas.repository.patches;
 
-import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.type.AtlasTypeRegistry;
 
-import java.util.Map;
-
-/**
- * Patch context for typedef and java patches.
- */
 public class PatchContext {
     private final AtlasGraph               graph;
     private final AtlasTypeRegistry        typeRegistry;
     private final GraphBackedSearchIndexer indexer;
-    private final Map<String, PatchStatus> patchesRegistry;
-
-    public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer,
-                        Map<String, PatchStatus> patchesRegistry) {
-        this.graph           = graph;
-        this.typeRegistry    = typeRegistry;
-        this.indexer         = indexer;
-        this.patchesRegistry = patchesRegistry;
+    private final AtlasPatchRegistry       patchRegistry;
+
+    public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer) {
+        this.graph         = graph;
+        this.typeRegistry  = typeRegistry;
+        this.indexer       = indexer;
+        this.patchRegistry = new AtlasPatchRegistry(this.graph);
     }
 
     public AtlasGraph getGraph() {
@@ -53,7 +47,7 @@ public class PatchContext {
         return indexer;
     }
 
-    public Map<String, PatchStatus> getPatchesRegistry() {
-        return patchesRegistry;
+    public AtlasPatchRegistry getPatchRegistry() {
+        return patchRegistry;
     }
-}
\ No newline at end of file
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
new file mode 100644
index 0000000..c5af500
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.IndexException;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
+import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
+
+public class UniqueAttributePatch extends AtlasPatchHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatch.class);
+
+    private static final String PATCH_ID          = "JAVA_PATCH_0000_001";
+    private static final String PATCH_DESCRIPTION = "Add __u_ property for each unique attribute of active entities";
+
+    private final AtlasGraph               graph;
+    private final GraphBackedSearchIndexer indexer;
+    private final AtlasTypeRegistry        typeRegistry;
+
+    public UniqueAttributePatch(PatchContext context) {
+        super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
+
+        this.graph        = context.getGraph();
+        this.indexer      = context.getIndexer();
+        this.typeRegistry = context.getTypeRegistry();
+    }
+
+    @Override
+    public void apply() {
+        TypeNameAttributeCache        typeNameAttributeCache = registerUniqueAttributeForTypes();
+        UniqueAttributePatchProcessor patchProcessor         = new UniqueAttributePatchProcessor(this.graph);
+
+        patchProcessor.apply(typeNameAttributeCache.getAll());
+
+        setStatus(APPLIED);
+
+        LOG.info("UniqueAttributePatch: {}; status: {}", getPatchId(), getStatus());
+    }
+
+    private TypeNameAttributeCache registerUniqueAttributeForTypes() {
+        TypeNameAttributeCache ret = new TypeNameAttributeCache();
+
+        for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
+            createIndexForUniqueAttributes(entityType.getTypeName(), entityType.getUniqAttributes().values());
+
+            ret.add(entityType, entityType.getUniqAttributes().values());
+        }
+
+        return ret;
+    }
+
+    private boolean createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
+        try {
+            AtlasGraphManagement management = graph.getManagementSystem();
+
+            for (AtlasAttribute attribute : attributes) {
+                String uniquePropertyName = attribute.getVertexUniquePropertyName();
+
+                if (management.getPropertyKey(uniquePropertyName) != null) {
+                    continue;
+                }
+
+                AtlasAttributeDef attributeDef   = attribute.getAttributeDef();
+                boolean           isIndexable    = attributeDef.getIsIndexable();
+                String            attribTypeName = attributeDef.getTypeName();
+                Class             propertyClass  = indexer.getPrimitiveClass(attribTypeName);
+                AtlasCardinality  cardinality    = indexer.toAtlasCardinality(attributeDef.getCardinality());
+
+                indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
+            }
+
+            indexer.commit(management);
+            graph.commit();
+
+            LOG.info("Unique attributes: type: {}: Registered!", typeName);
+
+            return true;
+        } catch (IndexException e) {
+            LOG.error("Error creating index: type: {}", typeName, e);
+            return false;
+        }
+    }
+
+    public static class UniqueAttributePatchProcessor {
+        private static final String NUM_WORKERS_PROPERTY = "atlas.patch.unique_attribute_patch.numWorkers";
+        private static final String BATCH_SIZE_PROPERTY  = "atlas.patch.unique_attribute_patch.batchSize";
+        private static final String ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
+        private static final int    NUM_WORKERS;
+        private static final int    BATCH_SIZE;
+
+        private final AtlasGraph graph;
+
+        static {
+            int numWorkers = 3;
+            int batchSize  = 300;
+
+            try {
+                numWorkers = ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, getDefaultNumWorkers());
+                batchSize  = ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
+
+                LOG.info("UniqueAttributePatch: {}={}, {}={}", NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
+            } catch (Exception e) {
+                LOG.error("Error retrieving configuration.", e);
+            }
+
+            NUM_WORKERS = numWorkers;
+            BATCH_SIZE  = batchSize;
+        }
+
+        public UniqueAttributePatchProcessor(AtlasGraph graph) {
+            this.graph = graph;
+        }
+
+        public void apply(final Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache) {
+            WorkItemManager manager = null;
+
+            try {
+                Iterator<AtlasVertex> iterator = graph.getVertices().iterator();
+
+                if (iterator.hasNext()) {
+                    manager = new WorkItemManager<>(new ConsumerBuilder(graph), BATCH_SIZE, NUM_WORKERS);
+
+                    LOG.info("Processing: Started...");
+
+                    while (iterator.hasNext()) {
+                        AtlasVertex vertex = iterator.next();
+
+                        if (!AtlasGraphUtilsV2.isEntityVertex(vertex)) {
+                            continue;
+                        }
+
+                        String typeName = AtlasGraphUtilsV2.getTypeName(vertex);
+
+                        submitForProcessing(typeName, vertex, manager, typeUniqueAttributeCache.get(typeName));
+                    }
+
+                    manager.drain();
+                }
+            } catch (Exception ex) {
+                LOG.error("Error: ", ex);
+            } finally {
+                if (manager != null) {
+                    try {
+                        manager.shutdown();
+                    } catch (InterruptedException e) {
+                        LOG.error("Interrupted", e);
+                    }
+                }
+            }
+        }
+
+        private static int getDefaultNumWorkers() throws AtlasException {
+            return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
+        }
+
+        private void submitForProcessing(String typeName, AtlasVertex vertex, WorkItemManager manager, Collection<AtlasAttribute> uniqAttributes) {
+            WorkItem workItem = new WorkItem(typeName, (Long) vertex.getId(), uniqAttributes);
+
+            manager.checkProduce(workItem);
+        }
+
+
+        private static class WorkItem {
+            private final String                     typeName;
+            private final long                       id;
+            private final Collection<AtlasAttribute> uniqueAttributeValues;
+
+            public WorkItem(String typeName, long id, Collection<AtlasAttribute> uniqueAttributeValues) {
+                this.typeName              = typeName;
+                this.id                    = id;
+                this.uniqueAttributeValues = uniqueAttributeValues;
+            }
+        }
+
+        private static class Consumer extends WorkItemConsumer<WorkItem> {
+            private static int MAX_COMMIT_RETRY_COUNT = 3;
+
+            private final AtlasGraph graph;
+            private final AtomicLong counter;
+
+            public Consumer(AtlasGraph graph, BlockingQueue<WorkItem> queue) {
+                super(queue);
+
+                this.graph   = graph;
+                this.counter = new AtomicLong(0);
+            }
+
+            @Override
+            protected void doCommit() {
+                if (counter.get() % BATCH_SIZE == 0) {
+                    LOG.info("Processed: {}", counter.get());
+
+                    attemptCommit();
+                }
+            }
+
+            @Override
+            protected void commitDirty() {
+                attemptCommit();
+
+                LOG.info("Total: Commit: {}", counter.get());
+
+                super.commitDirty();
+            }
+
+            private void attemptCommit() {
+                for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+                    try {
+                        graph.commit();
+
+                        break;
+                    } catch(Exception ex) {
+                        LOG.error("Commit exception: ", retryCount, ex);
+
+                        try {
+                            Thread.currentThread().sleep(300 * retryCount);
+                        } catch (InterruptedException e) {
+                            LOG.error("Commit exception: Pause: Interrputed!", e);
+                        }
+                    }
+                }
+            }
+
+            @Override
+            protected void processItem(WorkItem wi) {
+                counter.incrementAndGet();
+
+                String typeName = wi.typeName;
+
+                if(wi.uniqueAttributeValues == null) {
+                    return;
+                }
+
+                AtlasVertex vertex = graph.getVertex(Long.toString(wi.id));
+
+                if (vertex == null) {
+                    LOG.warn("processItem: AtlasVertex with id: ({}): not found!", wi.id);
+
+                    return;
+                }
+
+                if (AtlasGraphUtilsV2.isTypeVertex(vertex)) {
+                    return;
+                }
+
+                AtlasEntity.Status status = AtlasGraphUtilsV2.getState(vertex);
+
+                if (status != AtlasEntity.Status.ACTIVE) {
+                    return;
+                }
+
+                try {
+                    LOG.debug("processItem: {}", wi.id);
+
+                    for (AtlasAttribute attribute : wi.uniqueAttributeValues) {
+                        String                       uniquePropertyKey = attribute.getVertexUniquePropertyName();
+                        Collection<? extends String> propertyKeys      = vertex.getPropertyKeys();
+                        Object                       uniqAttrValue     = null;
+
+                        if (propertyKeys != null && propertyKeys.contains(uniquePropertyKey)) {
+                            LOG.debug("processItem: {}: Skipped!", wi.id);
+                        } else {
+                            try {
+                                String propertyKey = attribute.getVertexPropertyName();
+
+                                uniqAttrValue = EntityGraphRetriever.mapVertexToPrimitive(vertex, propertyKey, attribute.getAttributeDef());
+
+                                AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
+                            } catch(AtlasSchemaViolationException ex) {
+                                LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
+                            }
+                        }
+
+                        commit();
+                    }
+
+                    LOG.debug("processItem: {}: Done!", wi.id);
+                } catch (Exception ex) {
+                    LOG.error("Error found: {}: {}", typeName, wi.id, ex);
+                }
+            }
+        }
+
+        private class ConsumerBuilder implements WorkItemBuilder<Consumer, WorkItem> {
+            private final AtlasGraph graph;
+
+            public ConsumerBuilder(AtlasGraph graph) {
+                this.graph = graph;
+            }
+
+            @Override
+            public Consumer build(BlockingQueue<WorkItem> queue) {
+                return new Consumer(graph, queue);
+            }
+        }
+    }
+
+    public static class TypeNameAttributeCache {
+        private Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache = new HashMap<>();
+
+        public void add(AtlasEntityType entityType, Collection<AtlasAttribute> values) {
+            typeUniqueAttributeCache.put(entityType.getTypeName(), values);
+        }
+
+        public Collection<AtlasAttribute> get(String typeName) {
+            return typeUniqueAttributeCache.get(typeName);
+        }
+
+        public boolean has(String typeName) {
+            return typeUniqueAttributeCache.containsKey(typeName);
+        }
+
+        public Map<String, Collection<AtlasAttribute>> getAll() {
+            return typeUniqueAttributeCache;
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java
deleted file mode 100644
index f2238f1..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.patches;
-
-import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.repository.IndexException;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
-import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
-import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
-import org.apache.commons.collections.MapUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
-import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findActiveEntityVerticesByType;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
-
-public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler {
-    private static final String PATCH_ID          = "JAVA_PATCH_0000_001";
-    private static final String PATCH_DESCRIPTION = "Add new vertex property for each unique attribute of active entities";
-    private static final Logger LOG               = LoggerFactory.getLogger(UniqueAttributePatchHandler.class);
-
-    public UniqueAttributePatchHandler(PatchContext context) {
-        super(context, PATCH_ID, PATCH_DESCRIPTION);
-    }
-
-    @Override
-    public void applyPatch() {
-        Collection<AtlasEntityType> allEntityTypes = typeRegistry.getAllEntityTypes();
-        boolean                     patchFailed    = false;
-
-        for (AtlasEntityType entityType : allEntityTypes) {
-            String                      typeName          = entityType.getTypeName();
-            Map<String, AtlasAttribute> uniqAttributes    = entityType.getUniqAttributes();
-            int                         patchAppliedCount = 0;
-
-            LOG.info("Applying java patch: {} for type: {}", getPatchId(), typeName);
-
-            if (MapUtils.isNotEmpty(uniqAttributes)) {
-                Collection<AtlasAttribute> attributes = uniqAttributes.values();
-
-                try {
-                    // register unique attribute property keys in graph
-                    registerUniqueAttrPropertyKeys(attributes);
-
-                    Iterator<Result<Object, Object>> iterator = findActiveEntityVerticesByType(typeName);
-
-                    int entityCount = 0;
-
-                    while (iterator != null && iterator.hasNext()) {
-                        AtlasVertex entityVertex = iterator.next().getVertex();
-                        boolean     patchApplied = false;
-
-                        entityCount++;
-
-                        for (AtlasAttribute attribute : attributes) {
-                            String                       uniquePropertyKey = attribute.getVertexUniquePropertyName();
-                            Collection<? extends String> propertyKeys      = entityVertex.getPropertyKeys();
-
-                            if (!propertyKeys.contains(uniquePropertyKey)) {
-                                String            propertyKey   = attribute.getVertexPropertyName();
-                                AtlasAttributeDef attributeDef  = attribute.getAttributeDef();
-                                Object            uniqAttrValue = entityRetriever.mapVertexToPrimitive(entityVertex, propertyKey, attributeDef);
-
-                                // add the unique attribute property to vertex
-                                setEncodedProperty(entityVertex, uniquePropertyKey, uniqAttrValue);
-
-                                try {
-                                    graph.commit();
-
-                                    patchApplied = true;
-
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.debug("{}: Added unique attribute property: {} to entity: {} ({})",
-                                                   PATCH_ID, uniquePropertyKey, getGuid(entityVertex), typeName);
-                                    }
-                                } catch (Throwable t) {
-                                    LOG.warn("Java patch ({}): failed to update entity guid: {}; typeName: {}; attrName: {}; attrValue: {}",
-                                              getPatchId(), getGuid(entityVertex), typeName, attribute.getName(), uniqAttrValue);
-
-                                    continue;
-                                }
-                            }
-                        }
-
-                        if (patchApplied) {
-                            patchAppliedCount++;
-                        }
-
-                        if (entityCount % 1000 == 0) {
-                            LOG.info("Java patch: {} : applied {}; processed {} {} entities.", getPatchId(), patchAppliedCount, entityCount, typeName);
-                        }
-                    }
-                } catch (IndexException e) {
-                    LOG.error("Java patch: {} failed! error: {}", getPatchId(), e);
-
-                    patchFailed = true;
-
-                    break;
-                }
-            }
-
-            LOG.info("Applied java patch ({}) for type: {}; Total processed: {}", getPatchId(), typeName, patchAppliedCount);
-        }
-
-        if (patchFailed) {
-            setPatchStatus(FAILED);
-        } else {
-            setPatchStatus(APPLIED);
-        }
-
-        LOG.info("Applied java patch: {}; status: {}", getPatchId(), getPatchStatus());
-
-        updatePatchVertex(getPatchStatus());
-    }
-
-    private void registerUniqueAttrPropertyKeys(Collection<AtlasAttribute> attributes) throws IndexException {
-        AtlasGraphManagement management = graph.getManagementSystem();
-
-        for (AtlasAttribute attribute : attributes) {
-            String  uniquePropertyName       = attribute.getVertexUniquePropertyName();
-            boolean uniquePropertyNameExists = management.getPropertyKey(uniquePropertyName) != null;
-
-            if (!uniquePropertyNameExists) {
-                AtlasAttributeDef attributeDef   = attribute.getAttributeDef();
-                boolean           isIndexable    = attributeDef.getIsIndexable();
-                String            attribTypeName = attributeDef.getTypeName();
-                Class             propertyClass  = indexer.getPrimitiveClass(attribTypeName);
-                AtlasCardinality  cardinality    = indexer.toAtlasCardinality(attributeDef.getCardinality());
-
-                indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.NONE, propertyClass, cardinality, isIndexable, true);
-            }
-        }
-
-        //Commit indexes
-        indexer.commit(management);
-        graph.commit();
-    }
-}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 78f3faf..662edc9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -21,7 +21,6 @@ package org.apache.atlas.repository.store.bootstrap;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
@@ -41,14 +40,8 @@ import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.repository.patches.AtlasJavaPatchHandler;
-import org.apache.atlas.repository.patches.PatchContext;
-import org.apache.atlas.repository.patches.UniqueAttributePatchHandler;
-import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
+import org.apache.atlas.repository.patches.AtlasPatchRegistry;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
@@ -83,19 +76,6 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED;
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
-import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ACTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getPatchesRegistry;
-import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
 
 /**
  * Class that handles initial loading of models and patches into typedef store
@@ -111,18 +91,16 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
 
     private final AtlasTypeDefStore        typeDefStore;
     private final AtlasTypeRegistry        typeRegistry;
-    private final AtlasGraph               graph;
     private final Configuration            conf;
-    private final GraphBackedSearchIndexer indexer;
+    private final AtlasPatchRegistry       patchRegistry;
 
     @Inject
     public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
-                                        AtlasGraph graph, Configuration conf, GraphBackedSearchIndexer indexer) {
-        this.typeDefStore = typeDefStore;
-        this.typeRegistry = typeRegistry;
-        this.graph        = graph;
-        this.conf         = conf;
-        this.indexer      = indexer;
+                                        AtlasGraph graph, Configuration conf) {
+        this.typeDefStore  = typeDefStore;
+        this.typeRegistry  = typeRegistry;
+        this.conf          = conf;
+        this.patchRegistry = new AtlasPatchRegistry(graph);
     }
 
     @PostConstruct
@@ -157,7 +135,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
 
         String       atlasHomeDir  = System.getProperty("atlas.home");
         String       modelsDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";
-        PatchContext patchContext  = initPatchContext();
 
         if (modelsDirName == null || modelsDirName.length() == 0) {
             LOG.info("Types directory {} does not exist or not readable or has no typedef files", modelsDirName);
@@ -176,49 +153,23 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
 	                        continue;
 	                    } else if (!folder.getName().equals(PATCHES_FOLDER_NAME)){
 	                        // load the models alphabetically in the subfolders apart from patches
-	                        loadModelsInFolder(folder, patchContext);
+	                        loadModelsInFolder(folder);
 	                    }
 	            }
             }
 
             // load any files in the top models folder and any associated patches.
-            loadModelsInFolder(topModeltypesDir, patchContext);
+            loadModelsInFolder(topModeltypesDir);
         }
 
-        // apply java patches
-        applyJavaPatches(patchContext);
-
         LOG.info("<== AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()");
     }
 
-    private void applyJavaPatches(PatchContext context) {
-        // register java patches
-        AtlasJavaPatchHandler[] patches = new AtlasJavaPatchHandler[] { new UniqueAttributePatchHandler(context) };
-
-        // apply java patches
-        for (AtlasJavaPatchHandler patch : patches) {
-            PatchStatus patchStatus = patch.getPatchStatus();
-
-            if (patchStatus == APPLIED || patchStatus == SKIPPED) {
-                LOG.info("Ignoring java patch: {}; status: {}", patch.getPatchId(), patchStatus);
-            } else {
-                LOG.info("Applying java patch: {}; status: {}", patch.getPatchId(), patchStatus);
-
-                patch.applyPatch();
-            }
-        }
-    }
-
-    public PatchContext initPatchContext() {
-        return new PatchContext(graph, typeRegistry, indexer, getPatchesRegistry());
-    }
-
     /**
      * Load all the model files in the supplied folder followed by the contents of the patches folder.
      * @param typesDir
-     * @param context
      */
-    private void loadModelsInFolder(File typesDir, PatchContext context) {
+    private void loadModelsInFolder(File typesDir) {
         LOG.info("==> AtlasTypeDefStoreInitializer({})", typesDir);
 
         String typesDirName = typesDir.getName();
@@ -260,7 +211,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
                 }
             }
 
-            applyTypePatches(typesDir.getPath(), context);
+            applyTypePatches(typesDir.getPath());
         }
         LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir);
     }
@@ -458,11 +409,10 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
         return ret;
     }
 
-    private void applyTypePatches(String typesDirName, PatchContext context) {
-        String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME;
-        File   typePatchesDir     = new File(typePatchesDirName);
-        File[] typePatchFiles     = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;
-        Map<String, PatchStatus> patchesRegistry = context.getPatchesRegistry();
+    private void applyTypePatches(String typesDirName) {
+        String             typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME;
+        File               typePatchesDir     = new File(typePatchesDirName);
+        File[]             typePatchFiles     = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;
 
         if (typePatchFiles == null || typePatchFiles.length == 0) {
             LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName);
@@ -505,7 +455,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
                         }
 
                         int patchIndex = 0;
-
                         for (TypeDefPatch patch : patches.getPatches()) {
                             PatchHandler patchHandler = patchHandlerRegistry.get(patch.getAction());
 
@@ -514,15 +463,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
                                 continue;
                             }
 
-                            String patchId = patch.getId();
-
-                            if (StringUtils.isEmpty(patchId)) {
-                                patchId = typePatchFile.getName() + "_" + patchIndex;
-
-                                patch.setId(patchId);
-                            }
-
-                            if (isPatchApplicable(patchId, patchesRegistry)) {
+                            if (patchRegistry.isApplicable(patch.getId(), patchFile, patchIndex++)) {
                                 PatchStatus status;
 
                                 try {
@@ -531,17 +472,14 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
                                     status = FAILED;
 
                                     LOG.error("Failed to apply {} (status: {}; action: {}) in file: {}. Ignored.",
-                                               patchId, status.toString(), patch.getAction(), patchFile);
+                                               patch.getId(), status.toString(), patch.getAction(), patchFile);
                                 }
 
-                                createOrUpdatePatchVertex(patch, status, patchesRegistry);
-
-                                LOG.info("{} (status: {}; action: {}) in file: {}", patchId, status.toString(), patch.getAction(), patchFile);
+                                patchRegistry.register(patch.id, patch.description, patch.action, status);
+                                LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile);
                             } else {
-                                LOG.info("{} in file: {} already {}. Ignoring.", patchId, patchFile, patchesRegistry.get(patchId).toString());
+                                LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString());
                             }
-
-                            patchIndex++;
                         }
                     } catch (Throwable t) {
                         LOG.error("Failed to apply patches in file {}. Ignored", patchFile, t);
@@ -551,38 +489,6 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
         }
     }
 
-    private boolean isPatchApplicable(String patchId, Map<String, PatchStatus> patchesRegistry) {
-        if (MapUtils.isEmpty(patchesRegistry) || !patchesRegistry.containsKey(patchId)) {
-            return true;
-        }
-
-        PatchStatus status = patchesRegistry.get(patchId);
-
-        if (status == FAILED || status == UNKNOWN) {
-            return true;
-        }
-
-        return false;
-    }
-
-    private void createOrUpdatePatchVertex(TypeDefPatch patch, PatchStatus patchStatus, Map<String, PatchStatus> patchesRegistry) {
-        String      patchId           = patch.getId();
-        boolean     isPatchRegistered = MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId);
-        AtlasVertex patchVertex       = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
-
-        setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
-        setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patch.getDescription());
-        setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE);
-        setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, patch.getAction());
-        setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
-        setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
-        setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
-        setEncodedProperty(patchVertex, CREATED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
-        setEncodedProperty(patchVertex, MODIFIED_BY_KEY, AtlasTypeDefGraphStoreV2.getCurrentUser());
-
-        AtlasGraphProvider.getGraphInstance().commit();
-    }
-
     /**
      * typedef patch details
      */
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 80141b4..70b01a5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -28,15 +28,12 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
-import org.apache.atlas.model.patches.AtlasPatch;
-import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
-import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
 import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
 import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
@@ -46,7 +43,6 @@ import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
@@ -57,30 +53,20 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
-import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
 import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
 import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
-import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ACTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
+import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY;
 import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
-import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.*;
+import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.ASC;
+import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.DESC;
 
 /**
  * Utility methods for Graph.
@@ -163,6 +149,10 @@ public class AtlasGraphUtilsV2 {
         return StringUtils.isNotEmpty(getIdFromVertex(vertex)) && StringUtils.isNotEmpty(getTypeName(vertex));
     }
 
+    public static boolean isTypeVertex(AtlasVertex vertex) {
+        return vertex.getProperty(TYPENAME_PROPERTY_KEY, String.class) != null;
+    }
+
     public static boolean isReference(AtlasType type) {
         return isReference(type.getTypeCategory());
     }
@@ -228,11 +218,12 @@ public class AtlasGraphUtilsV2 {
 
         Object existingValue = element.getProperty(propertyName, Object.class);
 
-        if (value == null) {
+        if (value == null || (value instanceof Collection && ((Collection)value).isEmpty())) {
             if (existingValue != null) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Removing property {} from {}", propertyName, toString(element));
                 }
+
                 element.removeProperty(propertyName);
             }
         } else {
@@ -241,7 +232,7 @@ public class AtlasGraphUtilsV2 {
                     LOG.debug("Setting property {} in {}", propertyName, toString(element));
                 }
 
-                if (value instanceof Date) {
+                if ( value instanceof Date) {
                     Long encodedValue = ((Date) value).getTime();
                     element.setProperty(propertyName, encodedValue);
                 } else {
@@ -341,22 +332,6 @@ public class AtlasGraphUtilsV2 {
         return vertex;
     }
 
-    public static AtlasVertex findByPatchId(String patchId) {
-        AtlasVertex ret        = null;
-        String      indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : ("+ patchId +")";
-        Iterator<Result<Object, Object>> results = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices();
-
-        while (results != null && results.hasNext()) {
-            ret = results.next().getVertex();
-
-            if (ret != null) {
-                break;
-            }
-        }
-
-        return ret;
-    }
-
     public static AtlasVertex findByGuid(String guid) {
         AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid);
 
@@ -470,80 +445,12 @@ public class AtlasGraphUtilsV2 {
         return vertex;
     }
 
-    public static Map<String, PatchStatus> getPatchesRegistry() {
-        Map<String, PatchStatus> ret     = new HashMap<>();
-        AtlasPatches             patches = getAllPatches();
-
-        for (AtlasPatch patch : patches.getPatches()) {
-            String      patchId     = patch.getId();
-            PatchStatus patchStatus = patch.getStatus();
-
-            if (patchId != null && patchStatus != null) {
-                ret.put(patchId, patchStatus);
-            }
-        }
-
-        return ret;
-    }
-
-    public static AtlasPatches getAllPatches() {
-        List<AtlasPatch>                 ret            = new ArrayList<>();
-        String                           idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)";
-        AtlasIndexQuery                  idxQuery       = AtlasGraphProvider.getGraphInstance().indexQuery(VERTEX_INDEX, idxQueryString);
-        Iterator<Result<Object, Object>> results;
-
-        try {
-            results = idxQuery.vertices();
-
-            while (results != null && results.hasNext()) {
-                AtlasVertex patchVertex = results.next().getVertex();
-                AtlasPatch  patch       = toAtlasPatch(patchVertex);
-
-                ret.add(patch);
-            }
-
-            // Sort the patches based on patch id
-            if (CollectionUtils.isNotEmpty(ret)) {
-                Collections.sort(ret, (p1, p2) -> p1.getId().compareTo(p2.getId()));
-            }
-        } catch (Throwable t) {
-            // first time idx query is fired, returns no field exists in solr exception
-            LOG.warn("getPatches() returned empty result!");
-        }
-
-        getGraphInstance().commit();
-
-        return new AtlasPatches(ret);
-    }
-
     public int getOpenTransactions() {
         Set openTransactions = getGraphInstance().getOpenTransactions();
 
         return (openTransactions != null) ? openTransactions.size() : 0;
     }
 
-    private static AtlasPatch toAtlasPatch(AtlasVertex vertex) {
-        AtlasPatch ret = new AtlasPatch();
-
-        ret.setId(getEncodedProperty(vertex, PATCH_ID_PROPERTY_KEY, String.class));
-        ret.setDescription(getEncodedProperty(vertex, PATCH_DESCRIPTION_PROPERTY_KEY, String.class));
-        ret.setType(getEncodedProperty(vertex, PATCH_TYPE_PROPERTY_KEY, String.class));
-        ret.setAction(getEncodedProperty(vertex, PATCH_ACTION_PROPERTY_KEY, String.class));
-        ret.setCreatedBy(getEncodedProperty(vertex, CREATED_BY_KEY, String.class));
-        ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class));
-        ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class));
-        ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class));
-        ret.setStatus(getPatchStatus(vertex));
-
-        return ret;
-    }
-
-    private static PatchStatus getPatchStatus(AtlasVertex vertex) {
-        String patchStatus = AtlasGraphUtilsV2.getEncodedProperty(vertex, PATCH_STATE_PROPERTY_KEY, String.class);
-
-        return patchStatus != null ? PatchStatus.valueOf(patchStatus) : UNKNOWN;
-    }
-
     public static List<String> findEntityGUIDsByType(String typename, SortOrder sortOrder) {
         AtlasGraphQuery query = getGraphInstance().query()
                                                   .has(ENTITY_TYPE_PROPERTY_KEY, typename);
@@ -566,20 +473,16 @@ public class AtlasGraphUtilsV2 {
         return ret;
     }
 
-    public static Iterator<Result<Object, Object>> findActiveEntityVerticesByType(String typename) {
-        AtlasIndexQuery indexQuery = getActiveEntityIndexQuery(typename);
-
-        return indexQuery.vertices();
+    public static Iterator<AtlasVertex> findActiveEntityVerticesByType(String typename) {
+        return findActiveEntityVerticesByType(getGraphInstance(), typename);
     }
 
-    private static AtlasIndexQuery getActiveEntityIndexQuery(String typename) {
-        StringBuilder sb = new StringBuilder();
-
-        sb.append(INDEX_SEARCH_PREFIX + "\"").append(TYPE_NAME_PROPERTY_KEY).append("\":").append(typename)
-          .append(" AND ")
-          .append(INDEX_SEARCH_PREFIX + "\"").append(STATE_PROPERTY_KEY).append("\":").append(Status.ACTIVE.name());
+    public static Iterator<AtlasVertex> findActiveEntityVerticesByType(AtlasGraph graph, String typename) {
+        AtlasGraphQuery query = graph.query()
+                                          .has(ENTITY_TYPE_PROPERTY_KEY, typename)
+                                          .has(STATE_PROPERTY_KEY, Status.ACTIVE.name());
 
-        return getGraphInstance().indexQuery(VERTEX_INDEX, sb.toString());
+        return query.vertices().iterator();
     }
 
     public static List<String> findEntityGUIDsByType(String typename) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 03d2c06..3e1e023 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -947,7 +947,7 @@ public class EntityGraphRetriever {
         return ret;
     }
 
-    public Object mapVertexToPrimitive(AtlasElement entityVertex, final String vertexPropertyName, AtlasAttributeDef attrDef) {
+    public static Object mapVertexToPrimitive(AtlasElement entityVertex, final String vertexPropertyName, AtlasAttributeDef attrDef) {
         Object ret = null;
 
         if (AtlasGraphUtilsV2.getEncodedProperty(entityVertex, vertexPropertyName, Object.class) == null) {
diff --git a/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
new file mode 100644
index 0000000..58396e5
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.patches;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.model.patches.AtlasPatch;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.patches.AtlasPatchRegistry;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class AtlasPatchRegistryTest {
+    @Inject
+    private AtlasGraph graph;
+
+    @Test
+    public void noPatchesRegistered() {
+        AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
+
+        assertPatches(registry, 0);
+    }
+
+    @Test(dependsOnMethods = "noPatchesRegistered")
+    public void registerPatch() {
+        AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
+
+        registry.register("1", "test patch", "apply", AtlasPatch.PatchStatus.UNKNOWN);
+
+        assertPatches(registry, 1);
+    }
+
+    @Test(dependsOnMethods = "registerPatch")
+    public void updateStatusForPatch() {
+        final AtlasPatch.PatchStatus expectedStatus = AtlasPatch.PatchStatus.APPLIED;
+        String                       patchId        = "1";
+
+        AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
+
+        registry.updateStatus(patchId, expectedStatus);
+
+        AtlasPatch.AtlasPatches patches = assertPatches(registry, 1);
+
+        assertEquals(patches.getPatches().get(0).getId(), patchId);
+        assertEquals(patches.getPatches().get(0).getStatus(), expectedStatus);
+    }
+
+
+    private AtlasPatch.AtlasPatches assertPatches(AtlasPatchRegistry registry, int i) {
+        AtlasPatch.AtlasPatches patches = registry.getAllPatches();
+
+        assertNotNull(patches);
+        assertEquals(patches.getPatches().size(), i);
+
+        return patches;
+    }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index ce2d76f..fcfbd21 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -100,7 +100,7 @@ import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE
  * Consumer of notifications from hooks e.g., hive hook etc.
  */
 @Component
-@Order(4)
+@Order(5)
 @DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"})
 public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
     private static final Logger LOG        = LoggerFactory.getLogger(NotificationHookConsumer.class);
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index c5ceb9d..2716873 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -23,16 +23,16 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.authorize.AtlasAdminAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
 import org.apache.atlas.authorize.AtlasEntityAccessRequest;
 import org.apache.atlas.authorize.AtlasPrivilege;
-import org.apache.atlas.authorize.AtlasAuthorizationUtils;
 import org.apache.atlas.discovery.SearchContext;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
 import org.apache.atlas.model.impexp.MigrationStatus;
 import org.apache.atlas.model.instance.AtlasCheckStateRequest;
@@ -46,14 +46,14 @@ import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.repository.impexp.MigrationProgressService;
 import org.apache.atlas.repository.impexp.ZipSink;
 import org.apache.atlas.repository.impexp.ZipSource;
+import org.apache.atlas.repository.patches.AtlasPatchManager;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.services.MetricsService;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.SearchTracker;
-import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.Servlets;
@@ -138,6 +138,7 @@ public class AdminResource {
     private final  ExportImportAuditService exportImportAuditService;
     private final  AtlasServerService       atlasServerService;
     private final  AtlasEntityStore         entityStore;
+    private final  AtlasPatchManager        patchManager;
 
     static {
         try {
@@ -152,7 +153,8 @@ public class AdminResource {
                          ExportService exportService, ImportService importService, SearchTracker activeSearches,
                          MigrationProgressService migrationProgressService,
                          AtlasServerService serverService,
-                         ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore) {
+                         ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore,
+                         AtlasPatchManager patchManager) {
         this.serviceState              = serviceState;
         this.metricsService            = metricsService;
         this.exportService             = exportService;
@@ -164,6 +166,7 @@ public class AdminResource {
         this.entityStore               = entityStore;
         this.exportImportAuditService  = exportImportAuditService;
         this.importExportOperationLock = new ReentrantLock();
+        this.patchManager              = patchManager;
     }
 
     /**
@@ -564,7 +567,7 @@ public class AdminResource {
             LOG.debug("==> AdminResource.getAtlasPatches()");
         }
 
-        AtlasPatches ret = AtlasGraphUtilsV2.getAllPatches();
+        AtlasPatches ret = patchManager.getAllPatches();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== AdminResource.getAtlasPatches()");
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index 223a90a..563a16f 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -51,7 +51,7 @@ public class AdminResourceTest {
 
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null);
         Response response = adminResource.getStatus();
         assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
         JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
@@ -62,7 +62,7 @@ public class AdminResourceTest {
     public void testResourceGetsValueFromServiceState() throws IOException {
         when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null);
         Response response = adminResource.getStatus();
 
         verify(serviceState).getState();