You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/04/17 23:35:31 UTC
[atlas] 01/02: ATLAS-3143: PatchFx: Memory usage and performance
improvement.
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 8cf09c51e9a8bf4e9077415f0ce21758fc90e5a9
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Wed Apr 17 15:15:37 2019 -0700
ATLAS-3143: PatchFx: Memory usage and performance improvement.
(cherry picked from commit 0bb18f08e6b63ce344d6b01b744d6ce492e16f50)
---
.../atlas/repository/graphdb/AtlasGraphQuery.java | 21 ++
.../tinkerpop/query/NativeTinkerpopGraphQuery.java | 21 ++
.../tinkerpop/query/TinkerpopGraphQuery.java | 57 +++++
.../graphdb/janus/query/NativeJanusGraphQuery.java | 58 +++++
.../java/org/apache/atlas/pc/WorkItemConsumer.java | 4 +-
.../repository/patches/AtlasPatchHandler.java | 4 +-
.../repository/patches/AtlasPatchRegistry.java | 18 +-
.../repository/patches/UniqueAttributePatch.java | 270 +++++++++------------
.../bootstrap/AtlasTypeDefStoreInitializer.java | 5 +-
.../atlas/patches/AtlasPatchRegistryTest.java | 3 +-
10 files changed, 293 insertions(+), 168 deletions(-)
diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
index 883a31b..1ef4615 100644
--- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
+++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphQuery.java
@@ -96,6 +96,27 @@ public interface AtlasGraphQuery<V, E> {
*/
Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
+ /**
+ * Executes the query and returns IDs of matching vertices.
+ * @return
+ */
+ Iterable<Object> vertexIds();
+
+ /**
+ * Executes the query and returns IDs of the matching vertices from given offset till the max limit
+ * @param limit max number of vertices
+ * @return
+ */
+ Iterable<Object> vertexIds(int limit);
+
+ /**
+ * Executes the query and returns IDs of the matching vertices from given offset till the max limit
+ * @param offset starting offset
+ * @param limit max number of vertices
+ * @return
+ */
+ Iterable<Object> vertexIds(int offset, int limit);
+
/**
* Adds a predicate that the returned vertices must have the specified
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java
index 19ec5e4..27480e6 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/NativeTinkerpopGraphQuery.java
@@ -77,6 +77,27 @@ public interface NativeTinkerpopGraphQuery<V, E> {
*/
Iterable<AtlasVertex<V, E>> vertices(int offset, int limit);
+ /**
+ * Executes the graph query.
+ * @return
+ */
+ Iterable<Object> vertexIds();
+
+ /**
+ * Executes graph query
+ * @param limit Max vertices to return
+ * @return
+ */
+ Iterable<Object> vertexIds(int limit);
+
+ /**
+ * Executes graph query
+ * @param offset Starting offset
+ * @param limit Max vertices to return
+ * @return
+ */
+ Iterable<Object> vertexIds(int offset, int limit);
+
/**
* Adds an in condition to the query.
diff --git a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java
index 5101039..c70e8bf 100644
--- a/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java
+++ b/graphdb/common/src/main/java/org/apache/atlas/repository/graphdb/tinkerpop/query/TinkerpopGraphQuery.java
@@ -237,6 +237,63 @@ public abstract class TinkerpopGraphQuery<V, E> implements AtlasGraphQuery<V, E>
}
@Override
+ public Iterable<Object> vertexIds() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing: " + queryCondition);
+ }
+
+ // Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
+ Set<Object> result = new HashSet<>();
+
+ for(AndCondition andExpr : queryCondition.getAndTerms()) {
+ NativeTinkerpopGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
+ for(Object vertexId : andQuery.vertexIds()) {
+ result.add(vertexId);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Iterable<Object> vertexIds(int limit) {
+ return vertexIds(0, limit);
+ }
+
+ @Override
+ public Iterable<Object> vertexIds(int offset, int limit) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing: " + queryCondition);
+ }
+
+ Preconditions.checkArgument(offset >= 0, "Offset must be non-negative");
+ Preconditions.checkArgument(limit >= 0, "Limit must be non-negative");
+
+ // Compute the overall result by combining the results of all the AndConditions (nested within OR) together.
+ Set<Object> result = new HashSet<>();
+ long resultIdx = 0;
+ for(AndCondition andExpr : queryCondition.getAndTerms()) {
+ if (result.size() == limit) {
+ break;
+ }
+
+ NativeTinkerpopGraphQuery<V, E> andQuery = andExpr.create(getQueryFactory());
+ for(Object vertexId : andQuery.vertexIds(offset + limit)) {
+ if (resultIdx >= offset) {
+ result.add(vertexId);
+
+ if (result.size() == limit) {
+ break;
+ }
+ }
+
+ resultIdx++;
+ }
+ }
+
+ return result;
+ }
+
+ @Override
public AtlasGraphQuery<V, E> has(String propertyKey, QueryOperator operator,
Object value) {
queryCondition.andWith(new HasPredicate(propertyKey, operator, value));
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java
index 580caa5..a7a169a 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/query/NativeJanusGraphQuery.java
@@ -158,6 +158,64 @@ public class NativeJanusGraphQuery implements NativeTinkerpopGraphQuery<AtlasJan
}
@Override
+ public Iterable<Object> vertexIds() {
+ Set<Object> result = new HashSet<>();
+ Iterable<JanusGraphVertex> it = query.vertices();
+
+ for (Iterator<? extends Vertex> iter = it.iterator(); iter.hasNext(); ) {
+ result.add(iter.next().id());
+ }
+
+ return result;
+ }
+
+ @Override
+ public Iterable<Object> vertexIds(int limit) {
+ Set<Object> result = new HashSet<>(limit);
+ Iterable<JanusGraphVertex> it = query.limit(limit).vertices();
+
+ if (LOG.isDebugEnabled()) {
+ if (query instanceof GraphCentricQueryBuilder) {
+ LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.VERTEX));
+ } else {
+ LOG.debug("NativeJanusGraphQuery.vertices({}): resultSize={}, {}", limit, getCountForDebugLog(it), query);
+ }
+ }
+
+ for (Iterator<? extends Vertex> iter = it.iterator(); iter.hasNext(); ) {
+ result.add(iter.next().id());
+ }
+
+ return result;
+ }
+
+ @Override
+ public Iterable<Object> vertexIds(int offset, int limit) {
+ Set<Object> result = new HashSet<>(limit);
+ Iterable<JanusGraphVertex> it = query.limit(offset + limit).vertices();
+
+ if (LOG.isDebugEnabled()) {
+ if (query instanceof GraphCentricQueryBuilder) {
+ LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), ((GraphCentricQueryBuilder) query).constructQuery(ElementCategory.VERTEX));
+ } else {
+ LOG.debug("NativeJanusGraphQuery.vertices({}, {}): resultSize={}, {}", offset, limit, getCountForDebugLog(it), query);
+ }
+ }
+
+ Iterator<? extends Vertex> iter = it.iterator();
+
+ for (long resultIdx = 0; iter.hasNext() && result.size() < limit; resultIdx++) {
+ if (resultIdx < offset) {
+ continue;
+ }
+
+ result.add(iter.next().id());
+ }
+
+ return result;
+ }
+
+ @Override
public void in(String propertyName, Collection<? extends Object> values) {
query.has(propertyName, Contain.IN, values);
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
index 8351b7c..9ba4bf4 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
public abstract class WorkItemConsumer<T> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class);
- private static final int POLLING_DURATION_SECONDS = 30;
+ private static final int POLLING_DURATION_SECONDS = 5;
private static final int DEFAULT_COMMIT_TIME_IN_MS = 15000;
private final BlockingQueue<T> queue;
@@ -50,7 +50,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS);
if (item == null) {
- LOG.warn("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing");
+ LOG.debug("WorkItemConsumer.run(): no more items found in the queue. Will exit after committing");
commitDirty();
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
index d8dcfef..3e2bf53 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchHandler.java
@@ -41,8 +41,8 @@ public abstract class AtlasPatchHandler {
private void register() {
PatchStatus patchStatus = getStatus();
- if (patchStatus == UNKNOWN) {
- patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, getStatus());
+ if (patchStatus == null || patchStatus == UNKNOWN) {
+ patchRegistry.register(patchId, patchDescription, JAVA_PATCH_TYPE, "apply", UNKNOWN);
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
index df57959..80c2201 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchRegistry.java
@@ -59,8 +59,16 @@ public class AtlasPatchRegistry {
private final AtlasGraph graph;
public AtlasPatchRegistry(AtlasGraph graph) {
+ LOG.info("AtlasPatchRegistry: initializing..");
+
this.graph = graph;
this.patchNameStatusMap = getPatchNameStatusForAllRegistered(graph);
+
+ LOG.info("AtlasPatchRegistry: found {} patches", patchNameStatusMap.size());
+
+ for (Map.Entry<String, PatchStatus> entry : patchNameStatusMap.entrySet()) {
+ LOG.info("AtlasPatchRegistry: patchId={}, status={}", entry.getKey(), entry.getValue());
+ }
}
public boolean isApplicable(String incomingId, String patchFile, int index) {
@@ -83,8 +91,8 @@ public class AtlasPatchRegistry {
return patchNameStatusMap.get(id);
}
- public void register(String patchId, String description, String action, PatchStatus patchStatus) {
- createOrUpdatePatchVertex(graph, patchId, description, action, patchStatus);
+ public void register(String patchId, String description, String patchType, String action, PatchStatus patchStatus) {
+ createOrUpdatePatchVertex(graph, patchId, description, patchType, action, patchStatus);
}
public void updateStatus(String patchId, PatchStatus patchStatus) {
@@ -118,14 +126,14 @@ public class AtlasPatchRegistry {
return getAllPatches(graph);
}
- private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId,
- String description, String action, PatchStatus patchStatus) {
+ private void createOrUpdatePatchVertex(AtlasGraph graph, String patchId, String description,
+ String patchType, String action, PatchStatus patchStatus) {
boolean isPatchRegistered = MapUtils.isNotEmpty(patchNameStatusMap) && patchNameStatusMap.containsKey(patchId);
AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, description);
- setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, TYPEDEF_PATCH_TYPE);
+ setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, patchType);
setEncodedProperty(patchVertex, PATCH_ACTION_PROPERTY_KEY, action);
setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index c5af500..9338e74 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -27,19 +27,22 @@ import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
-import org.apache.atlas.repository.graphdb.*;
+import org.apache.atlas.repository.graphdb.AtlasCardinality;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
+import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -52,72 +55,23 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
private static final String PATCH_ID = "JAVA_PATCH_0000_001";
private static final String PATCH_DESCRIPTION = "Add __u_ property for each unique attribute of active entities";
- private final AtlasGraph graph;
- private final GraphBackedSearchIndexer indexer;
- private final AtlasTypeRegistry typeRegistry;
+ private final PatchContext context;
public UniqueAttributePatch(PatchContext context) {
super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
- this.graph = context.getGraph();
- this.indexer = context.getIndexer();
- this.typeRegistry = context.getTypeRegistry();
+ this.context = context;
}
@Override
public void apply() {
- TypeNameAttributeCache typeNameAttributeCache = registerUniqueAttributeForTypes();
- UniqueAttributePatchProcessor patchProcessor = new UniqueAttributePatchProcessor(this.graph);
+ UniqueAttributePatchProcessor patchProcessor = new UniqueAttributePatchProcessor(context);
- patchProcessor.apply(typeNameAttributeCache.getAll());
+ patchProcessor.apply();
setStatus(APPLIED);
- LOG.info("UniqueAttributePatch: {}; status: {}", getPatchId(), getStatus());
- }
-
- private TypeNameAttributeCache registerUniqueAttributeForTypes() {
- TypeNameAttributeCache ret = new TypeNameAttributeCache();
-
- for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
- createIndexForUniqueAttributes(entityType.getTypeName(), entityType.getUniqAttributes().values());
-
- ret.add(entityType, entityType.getUniqAttributes().values());
- }
-
- return ret;
- }
-
- private boolean createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
- try {
- AtlasGraphManagement management = graph.getManagementSystem();
-
- for (AtlasAttribute attribute : attributes) {
- String uniquePropertyName = attribute.getVertexUniquePropertyName();
-
- if (management.getPropertyKey(uniquePropertyName) != null) {
- continue;
- }
-
- AtlasAttributeDef attributeDef = attribute.getAttributeDef();
- boolean isIndexable = attributeDef.getIsIndexable();
- String attribTypeName = attributeDef.getTypeName();
- Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
- AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
-
- indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
- }
-
- indexer.commit(management);
- graph.commit();
-
- LOG.info("Unique attributes: type: {}: Registered!", typeName);
-
- return true;
- } catch (IndexException e) {
- LOG.error("Error creating index: type: {}", typeName, e);
- return false;
- }
+ LOG.info("UniqueAttributePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
}
public static class UniqueAttributePatchProcessor {
@@ -127,7 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
private static final int NUM_WORKERS;
private static final int BATCH_SIZE;
- private final AtlasGraph graph;
+ private final AtlasGraph graph;
+ private final GraphBackedSearchIndexer indexer;
+ private final AtlasTypeRegistry typeRegistry;
static {
int numWorkers = 3;
@@ -146,45 +102,78 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
BATCH_SIZE = batchSize;
}
- public UniqueAttributePatchProcessor(AtlasGraph graph) {
- this.graph = graph;
+ public UniqueAttributePatchProcessor(PatchContext context) {
+ this.graph = context.getGraph();
+ this.indexer = context.getIndexer();
+ this.typeRegistry = context.getTypeRegistry();
}
- public void apply(final Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache) {
- WorkItemManager manager = null;
+ public void apply() {
+ createIndexForUniqueAttributes();
+ addUniqueAttributeToAllVertices();
+ }
+ private void addUniqueAttributeToAllVertices() {
+ Iterable<Object> iterable = graph.query().vertexIds();
+ WorkItemManager manager = new WorkItemManager(new ConsumerBuilder(graph, typeRegistry), BATCH_SIZE, NUM_WORKERS);
try {
- Iterator<AtlasVertex> iterator = graph.getVertices().iterator();
+ for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
+ Object vertexId = iter.next();
+ submitForProcessing((Long) vertexId, manager);
+ }
- if (iterator.hasNext()) {
- manager = new WorkItemManager<>(new ConsumerBuilder(graph), BATCH_SIZE, NUM_WORKERS);
+ manager.drain();
+ } finally {
+ try {
+ manager.shutdown();
+ } catch (InterruptedException e) {
+ LOG.error("UniqueAttributePatchProcessor.apply(): interrupted during WorkItemManager shutdown", e);
+ }
+ }
+ }
- LOG.info("Processing: Started...");
+ private void createIndexForUniqueAttributes() {
+ for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
- while (iterator.hasNext()) {
- AtlasVertex vertex = iterator.next();
+ String typeName = entityType.getTypeName();
+ Collection<AtlasAttribute> uniqAttributes = entityType.getUniqAttributes().values();
- if (!AtlasGraphUtilsV2.isEntityVertex(vertex)) {
- continue;
- }
+ if (CollectionUtils.isEmpty(uniqAttributes)) {
+ LOG.info("UniqueAttributePatchProcessor.apply(): no unique attribute for entity-type {}", typeName);
- String typeName = AtlasGraphUtilsV2.getTypeName(vertex);
+ continue;
+ }
- submitForProcessing(typeName, vertex, manager, typeUniqueAttributeCache.get(typeName));
- }
+ createIndexForUniqueAttributes(typeName, uniqAttributes);
+ }
+ }
- manager.drain();
- }
- } catch (Exception ex) {
- LOG.error("Error: ", ex);
- } finally {
- if (manager != null) {
- try {
- manager.shutdown();
- } catch (InterruptedException e) {
- LOG.error("Interrupted", e);
+ private void createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) {
+ try {
+ AtlasGraphManagement management = graph.getManagementSystem();
+
+ for (AtlasAttribute attribute : attributes) {
+ String uniquePropertyName = attribute.getVertexUniquePropertyName();
+
+ if (management.getPropertyKey(uniquePropertyName) != null) {
+ continue;
}
+
+ AtlasAttributeDef attributeDef = attribute.getAttributeDef();
+ boolean isIndexable = attributeDef.getIsIndexable();
+ String attribTypeName = attributeDef.getTypeName();
+ Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
+ AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
+
+ indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.PER_TYPE_UNIQUE, propertyClass, cardinality, isIndexable, true);
}
+
+ indexer.commit(management);
+ graph.commit();
+
+ LOG.info("Unique attributes: type: {}: Registered!", typeName);
+ } catch (IndexException e) {
+ LOG.error("Error creating index: type: {}", typeName, e);
}
}
@@ -192,35 +181,37 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
}
- private void submitForProcessing(String typeName, AtlasVertex vertex, WorkItemManager manager, Collection<AtlasAttribute> uniqAttributes) {
- WorkItem workItem = new WorkItem(typeName, (Long) vertex.getId(), uniqAttributes);
-
- manager.checkProduce(workItem);
+ private void submitForProcessing(Long vertexId, WorkItemManager manager) {
+ manager.checkProduce(vertexId);
}
+ private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasGraph graph;
- private static class WorkItem {
- private final String typeName;
- private final long id;
- private final Collection<AtlasAttribute> uniqueAttributeValues;
+ public ConsumerBuilder(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
+ this.graph = graph;
+ this.typeRegistry = typeRegistry;
+ }
- public WorkItem(String typeName, long id, Collection<AtlasAttribute> uniqueAttributeValues) {
- this.typeName = typeName;
- this.id = id;
- this.uniqueAttributeValues = uniqueAttributeValues;
+ @Override
+ public Consumer build(BlockingQueue<Long> queue) {
+ return new Consumer(graph, typeRegistry, queue);
}
}
- private static class Consumer extends WorkItemConsumer<WorkItem> {
- private static int MAX_COMMIT_RETRY_COUNT = 3;
-
+ private static class Consumer extends WorkItemConsumer<Long> {
+ private int MAX_COMMIT_RETRY_COUNT = 3;
private final AtlasGraph graph;
+ private final AtlasTypeRegistry typeRegistry;
+
private final AtomicLong counter;
- public Consumer(AtlasGraph graph, BlockingQueue<WorkItem> queue) {
+ public Consumer(AtlasGraph graph, AtlasTypeRegistry typeRegistry, BlockingQueue<Long> queue) {
super(queue);
- this.graph = graph;
+ this.graph = graph;
+ this.typeRegistry = typeRegistry;
this.counter = new AtomicLong(0);
}
@@ -261,19 +252,11 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
}
@Override
- protected void processItem(WorkItem wi) {
- counter.incrementAndGet();
-
- String typeName = wi.typeName;
-
- if(wi.uniqueAttributeValues == null) {
- return;
- }
-
- AtlasVertex vertex = graph.getVertex(Long.toString(wi.id));
+ protected void processItem(Long vertexId) {
+ AtlasVertex vertex = graph.getVertex(Long.toString(vertexId));
if (vertex == null) {
- LOG.warn("processItem: AtlasVertex with id: ({}): not found!", wi.id);
+ LOG.warn("processItem(vertexId={}): AtlasVertex not found!", vertexId);
return;
}
@@ -282,23 +265,30 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
return;
}
- AtlasEntity.Status status = AtlasGraphUtilsV2.getState(vertex);
+ if (AtlasGraphUtilsV2.getState(vertex) != AtlasEntity.Status.ACTIVE) {
+ return;
+ }
- if (status != AtlasEntity.Status.ACTIVE) {
+ String typeName = AtlasGraphUtilsV2.getTypeName(vertex);
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+ if (entityType == null) {
return;
}
+ processItem(vertexId, vertex, typeName, entityType);
+ }
+
+ private void processItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
try {
- LOG.debug("processItem: {}", wi.id);
+ counter.incrementAndGet();
+ LOG.debug("processItem(typeName={}, vertexId={})", typeName, vertexId);
- for (AtlasAttribute attribute : wi.uniqueAttributeValues) {
+ for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
String uniquePropertyKey = attribute.getVertexUniquePropertyName();
Collection<? extends String> propertyKeys = vertex.getPropertyKeys();
Object uniqAttrValue = null;
- if (propertyKeys != null && propertyKeys.contains(uniquePropertyKey)) {
- LOG.debug("processItem: {}: Skipped!", wi.id);
- } else {
+ if (propertyKeys == null || !propertyKeys.contains(uniquePropertyKey)) {
try {
String propertyKey = attribute.getVertexPropertyName();
@@ -307,50 +297,18 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
AtlasGraphUtilsV2.setEncodedProperty(vertex, uniquePropertyKey, uniqAttrValue);
} catch(AtlasSchemaViolationException ex) {
LOG.error("Duplicates detected: {}:{}:{}", typeName, uniqAttrValue, getIdFromVertex(vertex));
+ vertex.removeProperty(uniquePropertyKey);
}
}
-
- commit();
}
- LOG.debug("processItem: {}: Done!", wi.id);
+ LOG.debug("processItem(typeName={}, vertexId={}): Done!", typeName, vertexId);
} catch (Exception ex) {
- LOG.error("Error found: {}: {}", typeName, wi.id, ex);
+ LOG.error("processItem(typeName={}, vertexId={}): failed!", typeName, vertexId, ex);
+ } finally {
+ commit();
}
}
}
-
- private class ConsumerBuilder implements WorkItemBuilder<Consumer, WorkItem> {
- private final AtlasGraph graph;
-
- public ConsumerBuilder(AtlasGraph graph) {
- this.graph = graph;
- }
-
- @Override
- public Consumer build(BlockingQueue<WorkItem> queue) {
- return new Consumer(graph, queue);
- }
- }
- }
-
- public static class TypeNameAttributeCache {
- private Map<String, Collection<AtlasAttribute>> typeUniqueAttributeCache = new HashMap<>();
-
- public void add(AtlasEntityType entityType, Collection<AtlasAttribute> values) {
- typeUniqueAttributeCache.put(entityType.getTypeName(), values);
- }
-
- public Collection<AtlasAttribute> get(String typeName) {
- return typeUniqueAttributeCache.get(typeName);
- }
-
- public boolean has(String typeName) {
- return typeUniqueAttributeCache.containsKey(typeName);
- }
-
- public Map<String, Collection<AtlasAttribute>> getAll() {
- return typeUniqueAttributeCache;
- }
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 662edc9..c3356ed 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -76,6 +76,7 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.SKIPPED;
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
/**
* Class that handles initial loading of models and patches into typedef store
@@ -475,7 +476,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
patch.getId(), status.toString(), patch.getAction(), patchFile);
}
- patchRegistry.register(patch.id, patch.description, patch.action, status);
+ patchRegistry.register(patch.id, patch.description, TYPEDEF_PATCH_TYPE, patch.action, status);
LOG.info("{} (status: {}; action: {}) in file: {}", patch.getId(), status.toString(), patch.getAction(), patchFile);
} else {
LOG.info("{} in file: {} already {}. Ignoring.", patch.getId(), patchFile, patchRegistry.getStatus(patch.getId()).toString());
@@ -783,7 +784,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException {
String typeName = patch.getTypeName();
AtlasBaseTypeDef typeDef = typeRegistry.getTypeDefByName(typeName);
- PatchStatus ret = null;
+ PatchStatus ret = UNKNOWN;
if (typeDef == null) {
throw new AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), typeName);
diff --git a/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
index 58396e5..bb42bb7 100644
--- a/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
+++ b/repository/src/test/java/org/apache/atlas/patches/AtlasPatchRegistryTest.java
@@ -27,6 +27,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject;
+import static org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer.TYPEDEF_PATCH_TYPE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -46,7 +47,7 @@ public class AtlasPatchRegistryTest {
public void registerPatch() {
AtlasPatchRegistry registry = new AtlasPatchRegistry(graph);
- registry.register("1", "test patch", "apply", AtlasPatch.PatchStatus.UNKNOWN);
+ registry.register("1", "test patch", TYPEDEF_PATCH_TYPE, "apply", AtlasPatch.PatchStatus.UNKNOWN);
assertPatches(registry, 1);
}