You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/04/17 23:35:31 UTC

[atlas] 01/02: ATLAS-3143: PatchFx: Memory usage and performance improvement.

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 8cf09c51e9a8bf4e9077415f0ce21758fc90e5a9
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Wed Apr 17 15:15:37 2019 -0700

    ATLAS-3143: PatchFx: Memory usage and performance improvement.
    
    (cherry picked from commit 0bb18f08e6b63ce344d6b01b744d6ce492e16f50)
---
 .../atlas/repository/graphdb/AtlasGraphQuery.java  |  21 ++
 .../tinkerpop/query/NativeTinkerpopGraphQuery.java |  21 ++
 .../tinkerpop/query/TinkerpopGraphQuery.java       |  57 +++++
 .../graphdb/janus/query/NativeJanusGraphQuery.java |  58 +++++
 .../java/org/apache/atlas/pc/WorkItemConsumer.java |   4 +-
 .../repository/patches/AtlasPatchHandler.java      |   4 +-
 .../repository/patches/AtlasPatchRegistry.java     |  18 +-
 .../repository/patches/UniqueAttributePatch.java   | 270 +++++++++------------
 .../bootstrap/AtlasTypeDefStoreInitializer.java    |   5 +-
 .../atlas/patches/AtlasPatchRegistryTest.java      |   3 +-
 10 files changed, 293 insertions(+), 168 deletions(-)

diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
index 883a31b..1ef4615 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
@@ -96,6 +96,27 @@ public interface AtlasGraphQuery<V, E> {
      */
     Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
 
+    /**
+     * Executes the query and returns IDs of matching vertices.
+     * @return
+     */
+    Iterable<Object> vertexIds();
+
+    /**
+     * Executes the query and returns IDs of the matching vertices from given offset till the max limit
+     * @param limit max number of vertices
+     * @return
+     */
+    Iterable<Object> vertexIds(int limit);
+
+    /**
+     * Executes the query and returns IDs of the matching vertices from given offset till the max limit
+     * @param offset starting offset
+     * @param limit max number of vertices
+     * @return
+     */
+    Iterable<Object> vertexIds(int offset, int limit);
+
 
     /**
      * Adds a predicate that the returned vertices must have the specified
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java
index 19ec5e4..27480e6 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java
@@ -77,6 +77,27 @@ public interface NativeTinkerpopGraphQuery<V, E> {
      */
     Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
 
+    /**
+     * Executes the graph query.
+     * @return
+     */
+    Iterable<Object> vertexIds();
+
+    /**
+     * Executes graph query
+     * @param limit Max vertices to return
+     * @return
+     */
+    Iterable<Object> vertexIds(int limit);
+
+    /**
+     * Executes graph query
+     * @param offset Starting offset
+     * @param limit Max vertices to return
+     * @return
+     */
+    Iterable<Object> vertexIds(int offset, int limit);
+
 
     /**
      * Adds an in condition to the query.
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java
index 5101039..c70e8bf 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java
@@ -237,6 +237,63 @@ public abstract class TinkerpopGraphQuery<V, E> implements AtlasGraphQuery<V, E>
     }
 
     @Override
+    public Iterable<Object> vertexIds() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Executing: " + queryCondition);
+        }
+
+        // Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
+        Set<Object> result = new HashSet<>();
+
+        for(AndCondition andExpr : queryCondition.getAndTerms()) {
+            NativeTinkerpopGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
+            for(Object vertexId : andQuery.vertexIds()) {
+                result.add(vertexId);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public Iterable<Object> vertexIds(int limit) {
+        return vertexIds(0, limit);
+    }
+
+    @Override
+    public Iterable<Object> vertexIds(int offset, int limit) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Executing: " + queryCondition);
+        }
+
+        Preconditions.checkArgument(offset >= 0, "Offset must be non-negative");
+        Preconditions.checkArgument(limit >= 0, "Limit must be non-negative");
+
+        // Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
+        Set<Object> result = new HashSet<>();
+        long resultIdx = 0;
+        for(AndCondition andExpr : queryCondition.getAndTerms()) {
+            if (result.size() == limit) {
+                break;
+            }
+
+            NativeTinkerpopGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
+            for(Object vertexId : andQuery.vertexIds(offset + limit)) {
+                if (resultIdx >= offset) {
+                    result.add(vertexId);
+
+                    if (result.size() == limit) {
+                        break;
+                    }
+                }
+
+                resultIdx++;
+            }
+        }
+
+        return result;
+    }
+
+    @Override
     public AtlasGraphQuery<V, E> has(String propertyKey, QueryOperator operator,
             Object value) {
         queryCondition.andWith(new HasPredicate(propertyKey, operator, value));
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java
index 580caa5..a7a169a 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java
@@ -158,6 +158,64 @@ public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJan
     }
 
     @Override
+    public Iterable<Object> vertexIds() {
+        Set<Object>                result = new HashSet<>();
+        Iterable<JanusGraphVertex> it     = query.vertices();
+
+        for (Iterator<? extends Vertex> iter = it.iterator(); iter.hasNext(); ) {
+            result.add(iter.next().id());
+        }
+
+        return result;
+    }
+
+    @Override
+    public Iterable<Object> vertexIds(int limit) {
+        Set<Object>                result = new HashSet<>(limit);
+        Iterable<JanusGraphVertex> it     = query.limit(limit).vertices();
+
+        if (LOG.isDebugEnabled()) {
+            if (query instanceof GraphCentricQueryBuilder) {
+                LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.VERTEX));
+            } else {
+                LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), query);
+            }
+        }
+
+        for (Iterator<? extends Vertex> iter = it.iterator(); iter.hasNext(); ) {
+            result.add(iter.next().id());
+        }
+
+        return result;
+    }
+
+    @Override
+    public Iterable<Object> vertexIds(int offset, int limit) {
+        Set<Object>                result = new HashSet<>(limit);
+        Iterable<JanusGraphVertex> it     = query.limit(offset + limit).vertices();
+
+        if (LOG.isDebugEnabled()) {
+            if (query instanceof GraphCentricQueryBuilder) {
+                LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.VERTEX));
+            } else {
+                LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), query);
+            }
+        }
+
+        Iterator<? extends Vertex> iter = it.iterator();
+
+        for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
+            if (resultIdx < offset) {
+                continue;
+            }
+
+            result.add(iter.next().id());
+        }
+
+        return result;
+    }
+
+    @Override
     public void in(String propertyName, Collection<? extends Object> values) {
         query.has(propertyName, Contain.IN, values);
 
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
index 8351b7c..9ba4bf4 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public abstract class WorkItemConsumer<T> implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class);
 
-    private static final int POLLING_DURATION_SECONDS  = 30;
+    private static final int POLLING_DURATION_SECONDS  = 5;
     private static final int DEFAULT_COMMIT_TIME_IN_MS = 15000;
 
     private final BlockingQueue<T> queue;
@@ -50,7 +50,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
                 T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS);
 
                 if (item == null) {
-                    LOG.warn("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing");
+                    LOG.debug("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing");
 
                     commitDirty();
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
index d8dcfef..3e2bf53 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
@@ -41,8 +41,8 @@ public abstract class AtlasPatchHandler {
     private void register() {
         PatchStatus patchStatus = getStatus();
 
-        if (patchStatus == UNKNOWN) {
-            patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, getStatus());
+        if (patchStatus == null || patchStatus == UNKNOWN) {
+            patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, "apply", UNKNOWN);
         }
     }
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
index df57959..80c2201 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
@@ -59,8 +59,16 @@ public class AtlasPatchRegistry {
     private final AtlasGraph               graph;
 
     public AtlasPatchRegistry(AtlasGraph graph) {
+        LOG.info("AtlasPatchRegistry: initializing..");
+
         this.graph              = graph;
         this.patchNameStatusMap = getPatchNameStatusForAllRegistered(graph);
+
+        LOG.info("AtlasPatchRegistry: found {} patches", patchNameStatusMap.size());
+
+        for (Map.Entry<String, PatchStatus> entry : patchNameStatusMap.entrySet()) {
+            LOG.info("AtlasPatchRegistry: patchId={}, status={}", entry.getKey(), entry.getValue());
+        }
     }
 
     public boolean isApplicable(String incomingId, String patchFile, int index) {
@@ -83,8 +91,8 @@ public class AtlasPatchRegistry {
         return patchNameStatusMap.get(id);
     }
 
-    public void register(String patchId, String description, String action, PatchStatus patchStatus) {
-        createOrUpdatePatchVertex(graph, patchId, description, action, patchStatus);
+    public void register(String patchId, String description, String patchType, String action, PatchStatus patchStatus) {
+        createOrUpdatePatchVertex(graph, patchId, description, patchType, action, patchStatus);
     }
 
     public void updateStatus(String patchId, PatchStatus patchStatus) {
@@ -118,14 +126,14 @@ public class AtlasPatchRegistry {
         return getAllPatches(graph);
     }
 
-    private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId,
-                                           String description, String action, PatchStatus patchStatus) {
+    private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId, String description,
+                                           String patchType, String action, PatchStatus patchStatus) {
         boolean     isPatchRegistered = MapUtils.isNotEmpty(patchNameStatusMap) && patchNameStatusMap.containsKey(patchId);
         AtlasVertex patchVertex       = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
 
         setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
         setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, description);
-        setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE);
+        setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, patchType);
         setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, action);
         setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
         setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index c5af500..9338e74 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -27,19 +27,22 @@ import org.apache.atlas.pc.WorkItemManager;
 import org.apache.atlas.repository.IndexException;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
-import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.repository.graphdb.AtlasCardinality;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
+import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -52,72 +55,23 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
     private static final String PATCH_ID          = "JAVA_PATCH_0000_001";
     private static final String PATCH_DESCRIPTION = "Add __u_ property for each unique attribute of active entities";
 
-    private final AtlasGraph               graph;
-    private final GraphBackedSearchIndexer indexer;
-    private final AtlasTypeRegistry        typeRegistry;
+    private final PatchContext context;
 
     public UniqueAttributePatch(PatchContext context) {
         super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
 
-        this.graph        = context.getGraph();
-        this.indexer      = context.getIndexer();
-        this.typeRegistry = context.getTypeRegistry();
+        this.context = context;
     }
 
     @Override
     public void apply() {
-        TypeNameAttributeCache        typeNameAttributeCache = registerUniqueAttributeForTypes();
-        UniqueAttributePatchProcessor patchProcessor         = new UniqueAttributePatchProcessor(this.graph);
+        UniqueAttributePatchProcessor patchProcessor = new UniqueAttributePatchProcessor(context);
 
-        patchProcessor.apply(typeNameAttributeCache.getAll());
+        patchProcessor.apply();
 
         setStatus(APPLIED);
 
-        LOG.info("UniqueAttributePatch: {}; status: {}", getPatchId(), getStatus());
-    }
-
-    private TypeNameAttributeCache registerUniqueAttributeForTypes() {
-        TypeNameAttributeCache ret = new TypeNameAttributeCache();
-
-        for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
-            createIndexForUniqueAttributes(entityType.getTypeName(), entityType.getUniqAttributes().values());
-
-            ret.add(entityType, entityType.getUniqAttributes().values());
-        }
-
-        return ret;
-    }
-
-    private boolean createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
-        try {
-            AtlasGraphManagement management = graph.getManagementSystem();
-
-            for (AtlasAttribute attribute : attributes) {
-                String uniquePropertyName = attribute.getVertexUniquePropertyName();
-
-                if (management.getPropertyKey(uniquePropertyName) != null) {
-                    continue;
-                }
-
-                AtlasAttributeDef attributeDef   = attribute.getAttributeDef();
-                boolean           isIndexable    = attributeDef.getIsIndexable();
-                String            attribTypeName = attributeDef.getTypeName();
-                Class             propertyClass  = indexer.getPrimitiveClass(attribTypeName);
-                AtlasCardinality  cardinality    = indexer.toAtlasCardinality(attributeDef.getCardinality());
-
-                indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
-            }
-
-            indexer.commit(management);
-            graph.commit();
-
-            LOG.info("Unique attributes: type: {}: Registered!", typeName);
-
-            return true;
-        } catch (IndexException e) {
-            LOG.error("Error creating index: type: {}", typeName, e);
-            return false;
-        }
+        LOG.info("UniqueAttributePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
     }
 
     public static class UniqueAttributePatchProcessor {
@@ -127,7 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
         private static final int    NUM_WORKERS;
         private static final int    BATCH_SIZE;
 
-        private final AtlasGraph graph;
+        private final AtlasGraph               graph;
+        private final GraphBackedSearchIndexer indexer;
+        private final AtlasTypeRegistry        typeRegistry;
 
         static {
             int numWorkers = 3;
@@ -146,45 +102,78 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
             BATCH_SIZE  = batchSize;
         }
 
-        public UniqueAttributePatchProcessor(AtlasGraph graph) {
-            this.graph = graph;
+        public UniqueAttributePatchProcessor(PatchContext context) {
+            this.graph        = context.getGraph();
+            this.indexer      = context.getIndexer();
+            this.typeRegistry = context.getTypeRegistry();
         }
 
-        public void apply(final Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache) {
-            WorkItemManager manager = null;
+        public void apply() {
+            createIndexForUniqueAttributes();
+            addUniqueAttributeToAllVertices();
+        }
 
+        private void addUniqueAttributeToAllVertices() {
+            Iterable<Object> iterable = graph.query().vertexIds();
+            WorkItemManager manager = new WorkItemManager(new ConsumerBuilder(graph, typeRegistry), BATCH_SIZE, NUM_WORKERS);
             try {
-                Iterator<AtlasVertex> iterator = graph.getVertices().iterator();
+                for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
+                    Object vertexId = iter.next();
+                    submitForProcessing((Long) vertexId, manager);
+                }
 
-                if (iterator.hasNext()) {
-                    manager = new WorkItemManager<>(new ConsumerBuilder(graph), BATCH_SIZE, NUM_WORKERS);
+                manager.drain();
+            } finally {
+                try {
+                    manager.shutdown();
+                } catch (InterruptedException e) {
+                    LOG.error("UniqueAttributePatchProcessor.apply(): interrupted during WorkItemManager shutdown", e);
+                }
+            }
+        }
 
-                    LOG.info("Processing: Started...");
+        private void createIndexForUniqueAttributes() {
+            for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
 
-                    while (iterator.hasNext()) {
-                        AtlasVertex vertex = iterator.next();
+                String typeName = entityType.getTypeName();
+                Collection<AtlasAttribute> uniqAttributes = entityType.getUniqAttributes().values();
 
-                        if (!AtlasGraphUtilsV2.isEntityVertex(vertex)) {
-                            continue;
-                        }
+                if (CollectionUtils.isEmpty(uniqAttributes)) {
+                    LOG.info("UniqueAttributePatchProcessor.apply(): no unique attribute for entity-type {}", typeName);
 
-                        String typeName = AtlasGraphUtilsV2.getTypeName(vertex);
+                    continue;
+                }
 
-                        submitForProcessing(typeName, vertex, manager, typeUniqueAttributeCache.get(typeName));
-                    }
+                createIndexForUniqueAttributes(typeName, uniqAttributes);
+            }
+        }
 
-                    manager.drain();
-                }
-            } catch (Exception ex) {
-                LOG.error("Error: ", ex);
-            } finally {
-                if (manager != null) {
-                    try {
-                        manager.shutdown();
-                    } catch (InterruptedException e) {
-                        LOG.error("Interrupted", e);
+        private void createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
+            try {
+                AtlasGraphManagement management = graph.getManagementSystem();
+
+                for (AtlasAttribute attribute : attributes) {
+                    String uniquePropertyName = attribute.getVertexUniquePropertyName();
+
+                    if (management.getPropertyKey(uniquePropertyName) != null) {
+                        continue;
                     }
+
+                    AtlasAttributeDef attributeDef   = attribute.getAttributeDef();
+                    boolean           isIndexable    = attributeDef.getIsIndexable();
+                    String            attribTypeName = attributeDef.getTypeName();
+                    Class             propertyClass  = indexer.getPrimitiveClass(attribTypeName);
+                    AtlasCardinality  cardinality    = indexer.toAtlasCardinality(attributeDef.getCardinality());
+
+                    indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
                 }
+
+                indexer.commit(management);
+                graph.commit();
+
+                LOG.info("Unique attributes: type: {}: Registered!", typeName);
+            } catch (IndexException e) {
+                LOG.error("Error creating index: type: {}", typeName, e);
             }
         }
 
@@ -192,35 +181,37 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
             return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
         }
 
-        private void submitForProcessing(String typeName, AtlasVertex vertex, WorkItemManager manager, Collection<AtlasAttribute> uniqAttributes) {
-            WorkItem workItem = new WorkItem(typeName, (Long) vertex.getId(), uniqAttributes);
-
-            manager.checkProduce(workItem);
+        private void submitForProcessing(Long vertexId, WorkItemManager manager) {
+            manager.checkProduce(vertexId);
         }
 
+        private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
+            private final AtlasTypeRegistry typeRegistry;
+            private final AtlasGraph graph;
 
-        private static class WorkItem {
-            private final String                     typeName;
-            private final long                       id;
-            private final Collection<AtlasAttribute> uniqueAttributeValues;
+            public ConsumerBuilder(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
+                this.graph = graph;
+                this.typeRegistry = typeRegistry;
+            }
 
-            public WorkItem(String typeName, long id, Collection<AtlasAttribute> uniqueAttributeValues) {
-                this.typeName              = typeName;
-                this.id                    = id;
-                this.uniqueAttributeValues = uniqueAttributeValues;
+            @Override
+            public Consumer build(BlockingQueue<Long> queue) {
+                return new Consumer(graph, typeRegistry, queue);
             }
         }
 
-        private static class Consumer extends WorkItemConsumer<WorkItem> {
-            private static int MAX_COMMIT_RETRY_COUNT = 3;
-
+        private static class Consumer extends WorkItemConsumer<Long> {
+            private int MAX_COMMIT_RETRY_COUNT = 3;
             private final AtlasGraph graph;
+            private final AtlasTypeRegistry typeRegistry;
+
             private final AtomicLong counter;
 
-            public Consumer(AtlasGraph graph, BlockingQueue<WorkItem> queue) {
+            public Consumer(AtlasGraph graph, AtlasTypeRegistry typeRegistry, BlockingQueue<Long> queue) {
                 super(queue);
 
-                this.graph   = graph;
+                this.graph        = graph;
+                this.typeRegistry = typeRegistry;
                 this.counter = new AtomicLong(0);
             }
 
@@ -261,19 +252,11 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
             }
 
             @Override
-            protected void processItem(WorkItem wi) {
-                counter.incrementAndGet();
-
-                String typeName = wi.typeName;
-
-                if(wi.uniqueAttributeValues == null) {
-                    return;
-                }
-
-                AtlasVertex vertex = graph.getVertex(Long.toString(wi.id));
+            protected void processItem(Long vertexId) {
+                AtlasVertex vertex = graph.getVertex(Long.toString(vertexId));
 
                 if (vertex == null) {
-                    LOG.warn("processItem: AtlasVertex with id: ({}): not found!", wi.id);
+                    LOG.warn("processItem(vertexId={}): AtlasVertex not found!", vertexId);
 
                     return;
                 }
@@ -282,23 +265,30 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
                     return;
                 }
 
-                AtlasEntity.Status status = AtlasGraphUtilsV2.getState(vertex);
+                if (AtlasGraphUtilsV2.getState(vertex) != AtlasEntity.Status.ACTIVE) {
+                    return;
+                }
 
-                if (status != AtlasEntity.Status.ACTIVE) {
+                String          typeName   = AtlasGraphUtilsV2.getTypeName(vertex);
+                AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+                if (entityType == null) {
                     return;
                 }
 
+                processItem(vertexId, vertex, typeName, entityType);
+            }
+
+            private void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
                 try {
-                    LOG.debug("processItem: {}", wi.id);
+                    counter.incrementAndGet();
+                    LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
 
-                    for (AtlasAttribute attribute : wi.uniqueAttributeValues) {
+                    for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
                         String                       uniquePropertyKey = attribute.getVertexUniquePropertyName();
                         Collection<? extends String> propertyKeys      = vertex.getPropertyKeys();
                         Object                       uniqAttrValue     = null;
 
-                        if (propertyKeys != null && propertyKeys.contains(uniquePropertyKey)) {
-                            LOG.debug("processItem: {}: Skipped!", wi.id);
-                        } else {
+                        if (propertyKeys == null || !propertyKeys.contains(uniquePropertyKey)) {
                             try {
                                 String propertyKey = attribute.getVertexPropertyName();
 
@@ -307,50 +297,18 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
                                 AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
                             } catch(AtlasSchemaViolationException ex) {
                                 LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
+                                vertex.removeProperty(uniquePropertyKey);
                             }
                         }
-
-                        commit();
                     }
 
-                    LOG.debug("processItem: {}: Done!", wi.id);
+                    LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
                 } catch (Exception ex) {
-                    LOG.error("Error found: {}: {}", typeName, wi.id, ex);
+                    LOG.error("processItem(typeName={}, vertexId={}): failed!", typeName, vertexId, ex);
+                } finally {
+                    commit();
                 }
             }
         }
-
-        private class ConsumerBuilder implements WorkItemBuilder<Consumer, WorkItem> {
-            private final AtlasGraph graph;
-
-            public ConsumerBuilder(AtlasGraph graph) {
-                this.graph = graph;
-            }
-
-            @Override
-            public Consumer build(BlockingQueue<WorkItem> queue) {
-                return new Consumer(graph, queue);
-            }
-        }
-    }
-
-    public static class TypeNameAttributeCache {
-        private Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache = new HashMap<>();
-
-        public void add(AtlasEntityType entityType, Collection<AtlasAttribute> values) {
-            typeUniqueAttributeCache.put(entityType.getTypeName(), values);
-        }
-
-        public Collection<AtlasAttribute> get(String typeName) {
-            return typeUniqueAttributeCache.get(typeName);
-        }
-
-        public boolean has(String typeName) {
-            return typeUniqueAttributeCache.containsKey(typeName);
-        }
-
-        public Map<String, Collection<AtlasAttribute>> getAll() {
-            return typeUniqueAttributeCache;
-        }
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 662edc9..c3356ed 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -76,6 +76,7 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED;
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
 
 /**
  * Class that handles initial loading of models and patches into typedef store
@@ -475,7 +476,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
                                                patch.getId(), status.toString(), patch.getAction(), patchFile);
                                 }
 
-                                patchRegistry.register(patch.id, patch.description, patch.action, status);
+                                patchRegistry.register(patch.id, patch.description, TYPEDEF_PATCH_TYPE, patch.action, status);
                                 LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile);
                             } else {
                                 LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString());
@@ -783,7 +784,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
         public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException {
             String           typeName       = patch.getTypeName();
             AtlasBaseTypeDef typeDef        = typeRegistry.getTypeDefByName(typeName);
-            PatchStatus      ret            = null;
+            PatchStatus      ret            = UNKNOWN;
 
             if (typeDef == null) {
                 throw new AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), typeName);
diff --git a/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
index 58396e5..bb42bb7 100644
--- a/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
+++ b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
@@ -27,6 +27,7 @@ import org.testng.annotations.Test;
 
 import javax.inject.Inject;
 
+import static org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer.TYPEDEF_PATCH_TYPE;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
@@ -46,7 +47,7 @@ public class AtlasPatchRegistryTest {
     public void registerPatch() {
         AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
 
-        registry.register("1", "test patch", "apply", AtlasPatch.PatchStatus.UNKNOWN);
+        registry.register("1", "test patch", TYPEDEF_PATCH_TYPE, "apply", AtlasPatch.PatchStatus.UNKNOWN);
 
         assertPatches(registry, 1);
     }