You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/02/22 01:59:15 UTC
[atlas] branch branch-1.0 updated: ATLAS-3055: updated entity
create/update to handle relationship attributes consistently
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-1.0 by this push:
new 14f0c67 ATLAS-3055: updated entity create/update to handle relationship attributes consistently
14f0c67 is described below
commit 14f0c6707db37ba6e3e575a56228037f8eaa19d0
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Thu Feb 21 09:54:05 2019 -0800
ATLAS-3055: updated entity create/update to handle relationship attributes consistently
(cherry picked from commit 18019733f28448458e2cad94f5e67aabd18316be)
---
.../org/apache/atlas/type/AtlasEntityType.java | 6 +-
.../apache/atlas/GraphTransactionInterceptor.java | 22 ++++++
.../org/apache/atlas/glossary/GlossaryUtils.java | 8 +-
.../graph/v2/AtlasEntityGraphDiscoveryV2.java | 9 ++-
.../store/graph/v2/AtlasEntityStoreV2.java | 31 ++++++--
.../store/graph/v2/AtlasGraphUtilsV2.java | 19 +++--
.../store/graph/v2/EntityGraphMapper.java | 92 ++++++++++++++++------
7 files changed, 138 insertions(+), 49 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 7166caa..8960703 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -709,10 +709,10 @@ public class AtlasEntityType extends AtlasStructType {
String attributeName = attribute.getName();
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
- if (((AtlasEntity) obj).hasRelationshipAttribute(attributeName)) {
- Object attributeValue = getNormalizedValue(entityObj.getAttribute(attributeName), attributeDef);
+ if (entityObj.hasRelationshipAttribute(attributeName)) {
+ Object attributeValue = getNormalizedValue(entityObj.getRelationshipAttribute(attributeName), attributeDef);
- obj.setAttribute(attributeName, attributeValue);
+ entityObj.setRelationshipAttribute(attributeName, attributeValue);
}
}
}
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index cbd2226..d0db58a 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -23,6 +23,7 @@ import org.aopalliance.intercept.MethodInvocation;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.exception.NotFoundException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import javax.ws.rs.core.Response;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,6 +49,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>();
private static final ThreadLocal<Boolean> isTxnOpen = ThreadLocal.withInitial(() -> Boolean.FALSE);
private static final ThreadLocal<Boolean> innerFailure = ThreadLocal.withInitial(() -> Boolean.FALSE);
+ private static final ThreadLocal<Map<String, AtlasVertex>> guidVertexCache = ThreadLocal.withInitial(() -> new HashMap<>());
private final AtlasGraph graph;
@@ -112,6 +115,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
// Reset the boolean flags
isTxnOpen.set(Boolean.FALSE);
innerFailure.set(Boolean.FALSE);
+ guidVertexCache.get().clear();
List<PostTransactionHook> trxHooks = postTransactionHooks.get();
@@ -172,6 +176,24 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
OBJECT_UPDATE_SYNCHRONIZER.lockObject(guids);
}
+ public static void addToVertexCache(String guid, AtlasVertex vertex) {
+ Map<String, AtlasVertex> cache = guidVertexCache.get();
+
+ cache.put(guid, vertex);
+ }
+
+ public static void removeFromVertexCache(String guid) {
+ Map<String, AtlasVertex> cache = guidVertexCache.get();
+
+ cache.remove(guid);
+ }
+
+ public static AtlasVertex getVertexFromCache(String guid) {
+ Map<String, AtlasVertex> cache = guidVertexCache.get();
+
+ return cache.get(guid);
+ }
+
boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryUtils.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryUtils.java
index cec4c23..9625f94 100644
--- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryUtils.java
+++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryUtils.java
@@ -88,13 +88,7 @@ public abstract class GlossaryUtils {
protected void createRelationship(AtlasRelationship relationship) throws AtlasBaseException {
- try {
- relationshipStore.create(relationship);
- } catch (AtlasBaseException e) {
- if (!e.getAtlasErrorCode().equals(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS)) {
- throw e;
- }
- }
+ relationshipStore.getOrCreate(relationship);
}
protected void updateRelationshipAttributes(AtlasRelationship relationship, AtlasRelatedTermHeader relatedTermHeader) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
index ddab2bf..23dc83a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
@@ -331,12 +331,15 @@ public class AtlasEntityGraphDiscoveryV2 implements EntityGraphDiscovery {
private void visitRelationships(AtlasEntityType entityType, AtlasEntity entity, List<String> visitedAttributes) throws AtlasBaseException {
for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
- AtlasType attrType = attribute.getAttributeType();
String attrName = attribute.getName();
- Object attrVal = entity.getRelationshipAttribute(attrName);
+ // if attribute is not in 'relationshipAttributes', try 'attributes'
if (entity.hasRelationshipAttribute(attrName)) {
- visitAttribute(attrType, attrVal);
+ visitAttribute(attribute.getAttributeType(), entity.getRelationshipAttribute(attrName));
+
+ visitedAttributes.add(attrName);
+ } else if (entity.hasAttribute(attrName)) {
+ visitAttribute(attribute.getAttributeType(), entity.getAttribute(attrName));
visitedAttributes.add(attrName);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 8af264b..a622fb5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -727,15 +727,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
boolean hasUpdates = false;
- if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
- hasUpdates = true; // if relationship attributes are provided, assume there is an update
- }
-
if (!hasUpdates) {
hasUpdates = entity.getStatus() == AtlasEntity.Status.DELETED; // entity status could be updated during import
}
- if (!hasUpdates) {
+ if (!hasUpdates && MapUtils.isNotEmpty(entity.getAttributes())) { // check for attribute value change
for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
if (!entity.getAttributes().containsKey(attribute.getName())) { // if value is not provided, current value will not be updated
continue;
@@ -756,6 +752,27 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
}
+ if (!hasUpdates && MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { // check of relationsship-attribute value change
+ for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
+ if (!entity.getRelationshipAttributes().containsKey(attribute.getName())) { // if value is not provided, current value will not be updated
+ continue;
+ }
+
+ Object newVal = entity.getRelationshipAttribute(attribute.getName());
+ Object currVal = entityRetriever.getEntityAttribute(vertex, attribute);
+
+ if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) {
+ hasUpdates = true;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("found relationship attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal);
+ }
+
+ break;
+ }
+ }
+ }
+
// if classifications are to be replaced, then skip updates only when no change in classifications
if (!hasUpdates && replaceClassifications) {
List<AtlasClassification> newVal = entity.getClassifications();
@@ -775,7 +792,9 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
entitiesToSkipUpdate = new ArrayList<>();
}
- LOG.info("skipping unchanged entity: {}", entity);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("skipping unchanged entity: {}", entity);
+ }
entitiesToSkipUpdate.add(entity);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
index 2548537..798b362 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.discovery.SearchProcessor;
@@ -324,14 +325,22 @@ public class AtlasGraphUtilsV2 {
}
public static AtlasVertex findByGuid(String guid) {
- AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
- .has(Constants.GUID_PROPERTY_KEY, guid);
+ AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid);
- Iterator<AtlasVertex> results = query.vertices().iterator();
+ if (ret == null) {
+ AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
+ .has(Constants.GUID_PROPERTY_KEY, guid);
- AtlasVertex vertex = results.hasNext() ? results.next() : null;
+ Iterator<AtlasVertex> results = query.vertices().iterator();
- return vertex;
+ ret = results.hasNext() ? results.next() : null;
+
+ if (ret != null) {
+ GraphTransactionInterceptor.addToVertexCache(guid, ret);
+ }
+ }
+
+ return ret;
}
public static String getTypeNameFromGuid(String guid) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 173fe02..baaca0b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TimeBoundary;
@@ -135,6 +136,8 @@ public class EntityGraphMapper {
AtlasGraphUtilsV2.setEncodedProperty(ret, GUID_PROPERTY_KEY, guid);
AtlasGraphUtilsV2.setEncodedProperty(ret, VERSION_PROPERTY_KEY, getEntityVersion(entity));
+ GraphTransactionInterceptor.addToVertexCache(guid, ret);
+
return ret;
}
@@ -186,11 +189,11 @@ public class EntityGraphMapper {
AtlasVertex vertex = context.getVertex(guid);
AtlasEntityType entityType = context.getType(guid);
- compactAttributes(createdEntity);
+ compactAttributes(createdEntity, entityType);
- mapRelationshipAttributes(createdEntity, vertex, CREATE, context);
+ mapRelationshipAttributes(createdEntity, entityType, vertex, CREATE, context);
- mapAttributes(createdEntity, vertex, CREATE, context);
+ mapAttributes(createdEntity, entityType, vertex, CREATE, context);
resp.addEntity(CREATE, constructHeader(createdEntity, entityType, vertex));
addClassifications(context, guid, createdEntity.getClassifications());
@@ -203,11 +206,11 @@ public class EntityGraphMapper {
AtlasVertex vertex = context.getVertex(guid);
AtlasEntityType entityType = context.getType(guid);
- compactAttributes(updatedEntity);
+ compactAttributes(updatedEntity, entityType);
- mapRelationshipAttributes(updatedEntity, vertex, UPDATE, context);
+ mapRelationshipAttributes(updatedEntity, entityType, vertex, UPDATE, context);
- mapAttributes(updatedEntity, vertex, UPDATE, context);
+ mapAttributes(updatedEntity, entityType, vertex, UPDATE, context);
if (isPartialUpdate) {
resp.addEntity(PARTIAL_UPDATE, constructHeader(updatedEntity, entityType, vertex));
@@ -283,8 +286,11 @@ public class EntityGraphMapper {
return ret;
}
-
private void mapAttributes(AtlasStruct struct, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
+ mapAttributes(struct, getStructType(struct.getTypeName()), vertex, op, context);
+ }
+
+ private void mapAttributes(AtlasStruct struct, AtlasStructType structType, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> mapAttributes({}, {})", op, struct.getTypeName());
}
@@ -292,8 +298,6 @@ public class EntityGraphMapper {
if (MapUtils.isNotEmpty(struct.getAttributes())) {
MetricRecorder metric = RequestContext.get().startMetricRecord("mapAttributes");
- AtlasStructType structType = getStructType(struct.getTypeName());
-
if (op.equals(CREATE)) {
for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
Object attrValue = struct.getAttribute(attribute.getName());
@@ -325,7 +329,7 @@ public class EntityGraphMapper {
}
}
- private void mapRelationshipAttributes(AtlasEntity entity, AtlasVertex vertex, EntityOperation op,
+ private void mapRelationshipAttributes(AtlasEntity entity, AtlasEntityType entityType, AtlasVertex vertex, EntityOperation op,
EntityMutationContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> mapRelationshipAttributes({}, {})", op, entity.getTypeName());
@@ -334,8 +338,6 @@ public class EntityGraphMapper {
if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
MetricRecorder metric = RequestContext.get().startMetricRecord("mapRelationshipAttributes");
- AtlasEntityType entityType = getEntityType(entity.getTypeName());
-
if (op.equals(CREATE)) {
for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
Object attrValue = entity.getRelationshipAttribute(attribute.getName());
@@ -439,7 +441,7 @@ public class EntityGraphMapper {
AtlasEdge newEdge = null;
if (ctx.getValue() != null) {
- AtlasEntityType instanceType = getInstanceType(ctx.getValue());
+ AtlasEntityType instanceType = getInstanceType(ctx.getValue(), context);
AtlasEdge edge = currentEdge != null ? currentEdge : null;
ctx.setElementType(instanceType);
@@ -1090,7 +1092,7 @@ public class EntityGraphMapper {
return mapStructValue(ctx, context);
case OBJECT_ID_TYPE:
- AtlasEntityType instanceType = getInstanceType(ctx.getValue());
+ AtlasEntityType instanceType = getInstanceType(ctx.getValue(), context);
ctx.setElementType(instanceType);
if (ctx.getAttributeDef().isSoftReferenced()) {
return mapSoftRefValue(ctx, context);
@@ -1163,23 +1165,50 @@ public class EntityGraphMapper {
return null;
}
- private AtlasEntityType getInstanceType(Object val) throws AtlasBaseException {
+ private AtlasEntityType getInstanceType(Object val, EntityMutationContext context) throws AtlasBaseException {
AtlasEntityType ret = null;
if (val != null) {
String typeName = null;
+ String guid = null;
if (val instanceof AtlasObjectId) {
- typeName = ((AtlasObjectId)val).getTypeName();
+ AtlasObjectId objId = (AtlasObjectId) val;
+
+ typeName = objId.getTypeName();
+ guid = objId.getGuid();
} else if (val instanceof Map) {
- Object typeNameVal = ((Map)val).get(AtlasObjectId.KEY_TYPENAME);
+ Map map = (Map) val;
+
+ Object typeNameVal = map.get(AtlasObjectId.KEY_TYPENAME);
+ Object guidVal = map.get(AtlasObjectId.KEY_GUID);
if (typeNameVal != null) {
typeName = typeNameVal.toString();
}
+
+ if (guidVal != null) {
+ guid = guidVal.toString();
+ }
}
- ret = typeName != null ? typeRegistry.getEntityTypeByName(typeName) : null;
+ if (typeName == null) {
+ if (guid != null) {
+ ret = context.getType(guid);
+
+ if (ret == null) {
+ AtlasVertex vertex = context.getDiscoveryContext().getResolvedEntityVertex(guid);
+
+ if (vertex != null) {
+ typeName = AtlasGraphUtilsV2.getTypeName(vertex);
+ }
+ }
+ }
+ }
+
+ if (ret == null && typeName != null) {
+ ret = typeRegistry.getEntityTypeByName(typeName);
+ }
if (ret == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
@@ -1868,15 +1897,28 @@ public class EntityGraphMapper {
}
}
- private static void compactAttributes(AtlasEntity entity) {
+ // move/remove relationship-attributes present in 'attributes'
+ private static void compactAttributes(AtlasEntity entity, AtlasEntityType entityType) {
if (entity != null) {
- Map<String, Object> relationshipAttributes = entity.getRelationshipAttributes();
- Map<String, Object> attributes = entity.getAttributes();
+ for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
+ String attrName = attribute.getName();
+
+ if (entity.hasAttribute(attrName)) {
+ Object attrValue = entity.getAttribute(attrName);
- if (MapUtils.isNotEmpty(relationshipAttributes) && MapUtils.isNotEmpty(attributes)) {
- for (String attrName : relationshipAttributes.keySet()) {
- if (attributes.containsKey(attrName)) {
- entity.removeAttribute(attrName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("relationship attribute {}.{} is present in entity, removing it", entityType.getTypeName(), attrName);
+ }
+
+ entity.removeAttribute(attrName);
+
+ if (attrValue != null) { // relationship attribute is present in 'attributes'
+ // if the attribute doesn't exist in relationshipAttributes, add it
+ Object relationshipAttrValue = entity.getRelationshipAttribute(attrName);
+
+ if (relationshipAttrValue == null) {
+ entity.setRelationshipAttribute(attrName, attrValue);
+ }
}
}
}