You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/02/10 03:18:55 UTC
atlas git commit: ATLAS-2435: performance improvements in
relationship instance create/update, entity update
Repository: atlas
Updated Branches:
refs/heads/master e545c9ffd -> ad6b07a98
ATLAS-2435: performance improvements in relationship instance create/update, entity update
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/ad6b07a9
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/ad6b07a9
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/ad6b07a9
Branch: refs/heads/master
Commit: ad6b07a981510408d0627a7260a6de2aae771f57
Parents: e545c9f
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Thu Feb 8 22:51:46 2018 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Feb 9 19:18:42 2018 -0800
----------------------------------------------------------------------
.../java/org/apache/atlas/AtlasErrorCode.java | 3 +-
.../model/instance/AtlasClassification.java | 4 +
.../atlas/model/instance/AtlasEntity.java | 51 +++++
.../atlas/model/instance/AtlasRelationship.java | 84 ++++++++
.../atlas/model/instance/AtlasStruct.java | 16 ++
.../org/apache/atlas/type/AtlasArrayType.java | 126 ++++++++++++
.../apache/atlas/type/AtlasBuiltInTypes.java | 54 +++++
.../atlas/type/AtlasClassificationType.java | 11 ++
.../org/apache/atlas/type/AtlasEntityType.java | 12 ++
.../org/apache/atlas/type/AtlasMapType.java | 68 +++++++
.../atlas/type/AtlasRelationshipType.java | 97 ++++++---
.../org/apache/atlas/type/AtlasStructType.java | 74 ++++++-
.../java/org/apache/atlas/type/AtlasType.java | 27 +++
.../org/apache/atlas/utils/AtlasEntityUtil.java | 29 +--
.../store/graph/AtlasRelationshipStore.java | 6 +
.../graph/v1/AtlasEntityChangeNotifier.java | 21 +-
.../store/graph/v1/AtlasEntityStoreV1.java | 79 ++++----
.../graph/v1/AtlasRelationshipStoreV1.java | 195 ++++++++++++++-----
.../store/graph/v1/EntityGraphMapper.java | 20 +-
.../store/graph/v1/EntityMutationContext.java | 16 +-
20 files changed, 820 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 7d88547..ff09e6c 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -120,7 +120,8 @@ public enum AtlasErrorCode {
INVALID_DSL_DUPLICATE_ALIAS(400, "ATLAS-400-00-066", "DSL Semantic Error - Duplicate alias found: '{0}' for type '{1}' already present."),
INVALID_DSL_INVALID_DATE(400, "ATLAS-400-00-067", "DSL Semantic Error - Date format: {0}."),
INVALID_DSL_HAS_PROPERTY(400, "ATLAS-400-00-068", "DSL Semantic Error - Property needs to be a primitive type: {0}"),
-
+ RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED(404, "ATLAS-400-00-069", "change of relationship end is not permitted. relationship-type={}, relationship-guid={}, end-guid={}, updated-end-guid={}"),
+ RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED(404, "ATLAS-400-00-06A", "change of relationship type is not permitted. relationship-guid={}, current-type={}, new-type={}"),
// All Not found enums go here
TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"),
TYPE_GUID_NOT_FOUND(404, "ATLAS-404-00-002", "Given type guid {0} was invalid"),
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java
index 1920eda..f594a81 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java
@@ -65,6 +65,10 @@ public class AtlasClassification extends AtlasStruct implements Serializable {
super(typeName, attrName, attrValue);
}
+ public AtlasClassification(Map map) {
+ super(map);
+ }
+
public AtlasClassification(AtlasClassification other) {
if (other != null) {
setTypeName(other.getTypeName());
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 94b0d22..08d1ce1 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -59,6 +59,15 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
public class AtlasEntity extends AtlasStruct implements Serializable {
private static final long serialVersionUID = 1L;
+ public static final String KEY_GUID = "guid";
+ public static final String KEY_STATUS = "status";
+ public static final String KEY_CREATED_BY = "createdBy";
+ public static final String KEY_UPDATED_BY = "updatedBy";
+ public static final String KEY_CREATE_TIME = "createTime";
+ public static final String KEY_UPDATE_TIME = "updateTime";
+ public static final String KEY_VERSION = "version";
+
+
/**
* Status of the entity - can be active or deleted. Deleted entities are not removed from Atlas store.
*/
@@ -102,6 +111,48 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
init();
}
+ public AtlasEntity(Map map) {
+ super(map);
+
+ if (map != null) {
+ Object oGuid = map.get(KEY_GUID);
+ Object status = map.get(KEY_STATUS);
+ Object createdBy = map.get(KEY_CREATED_BY);
+ Object updatedBy = map.get(KEY_UPDATED_BY);
+ Object createTime = map.get(KEY_CREATE_TIME);
+ Object updateTime = map.get(KEY_UPDATE_TIME);
+ Object version = map.get(KEY_VERSION);
+
+ if (oGuid != null) {
+ setGuid(oGuid.toString());
+ }
+
+ if (status != null) {
+ setStatus(Status.valueOf(status.toString()));
+ }
+
+ if (createdBy != null) {
+ setCreatedBy(createdBy.toString());
+ }
+
+ if (createTime instanceof Number) {
+ setCreateTime(new Date(((Number) createTime).longValue()));
+ }
+
+ if (updatedBy != null) {
+ setUpdatedBy(updatedBy.toString());
+ }
+
+ if (updateTime instanceof Number) {
+ setUpdateTime(new Date(((Number) updateTime).longValue()));
+ }
+
+ if (version instanceof Number) {
+ setVersion(((Number) version).longValue());
+ }
+ }
+ }
+
public AtlasEntity(AtlasEntity other) {
super(other);
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
index a9912fb..576847f 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
@@ -25,12 +25,14 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
+import org.apache.atlas.type.AtlasBuiltInTypes;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.Date;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,6 +52,18 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
public class AtlasRelationship extends AtlasStruct implements Serializable {
private static final long serialVersionUID = 1L;
+ public static final String KEY_GUID = "guid";
+ public static final String KEY_STATUS = "status";
+ public static final String KEY_CREATED_BY = "createdBy";
+ public static final String KEY_UPDATED_BY = "updatedBy";
+ public static final String KEY_CREATE_TIME = "createTime";
+ public static final String KEY_UPDATE_TIME = "updateTime";
+ public static final String KEY_VERSION = "version";
+ public static final String KEY_END1 = "end1";
+ public static final String KEY_END2 = "end2";
+ public static final String KEY_LABEL = "label";
+ public static final String KEY_PROPAGATE_TAGS = "propagateTags";
+
private String guid = null;
private AtlasObjectId end1 = null;
private AtlasObjectId end2 = null;
@@ -105,6 +119,76 @@ public class AtlasRelationship extends AtlasStruct implements Serializable {
this(relationshipDef != null ? relationshipDef.getName() : null);
}
+ public AtlasRelationship(Map map) {
+ super(map);
+
+ if (map != null) {
+ Object oGuid = map.get(KEY_GUID);
+ Object oEnd1 = map.get(KEY_END1);
+ Object oEnd2 = map.get(KEY_END2);
+ Object label = map.get(KEY_LABEL);
+ Object propagateTags = map.get(KEY_PROPAGATE_TAGS);
+ Object status = map.get(KEY_STATUS);
+ Object createdBy = map.get(KEY_CREATED_BY);
+ Object updatedBy = map.get(KEY_UPDATED_BY);
+ Object createTime = map.get(KEY_CREATE_TIME);
+ Object updateTime = map.get(KEY_UPDATE_TIME);
+ Object version = map.get(KEY_VERSION);
+
+ if (oGuid != null) {
+ setGuid(oGuid.toString());
+ }
+
+ if (oEnd1 != null) {
+ if (oEnd1 instanceof AtlasObjectId) {
+ setEnd1((AtlasObjectId) oEnd1);
+ } else if (oEnd1 instanceof Map) {
+ setEnd1(new AtlasObjectId((Map) oEnd1));
+ }
+ }
+
+ if (oEnd2 != null) {
+ if (oEnd2 instanceof AtlasObjectId) {
+ setEnd2((AtlasObjectId) oEnd2);
+ } else if (oEnd2 instanceof Map) {
+ setEnd2(new AtlasObjectId((Map) oEnd2));
+ }
+ }
+
+ if (label != null) {
+ setLabel(label.toString());
+ }
+
+ if (propagateTags != null) {
+ setPropagateTags(PropagateTags.valueOf(propagateTags.toString()));
+ }
+
+ if (status != null) {
+ setStatus(Status.valueOf(status.toString()));
+ }
+
+ if (createdBy != null) {
+ setCreatedBy(createdBy.toString());
+ }
+
+ if (createTime instanceof Number) {
+ setCreateTime(new Date(((Number) createTime).longValue()));
+ }
+
+ if (updatedBy != null) {
+ setUpdatedBy(updatedBy.toString());
+ }
+
+ if (updateTime instanceof Number) {
+ setUpdateTime(new Date(((Number) updateTime).longValue()));
+ }
+
+ if (version instanceof Number) {
+ setVersion(((Number) version).longValue());
+ }
+ }
+ }
+
public AtlasRelationship(AtlasRelationship other) {
super(other);
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
index 41061bc..18e7407 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
@@ -58,6 +58,9 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
public class AtlasStruct implements Serializable {
private static final long serialVersionUID = 1L;
+ public static final String KEY_TYPENAME = "typeName";
+ public static final String KEY_ATTRIBUTES = "attributes";
+
public static final String SERIALIZED_DATE_FORMAT_STR = "yyyyMMdd-HH:mm:ss.SSS-Z";
@Deprecated
public static final DateFormat DATE_FORMATTER = new SimpleDateFormat(SERIALIZED_DATE_FORMAT_STR);
@@ -83,6 +86,19 @@ public class AtlasStruct implements Serializable {
setAttribute(attrName, attrValue);
}
+ public AtlasStruct(Map map) {
+ if (map != null) {
+ Object typeName = map.get(KEY_TYPENAME);
+ Map attributes = (map.get(KEY_ATTRIBUTES) instanceof Map) ? (Map) map.get(KEY_ATTRIBUTES) : map;
+
+ if (typeName != null) {
+ setTypeName(typeName.toString());
+ }
+
+ setAttributes(new HashMap<>(attributes));
+ }
+ }
+
public AtlasStruct(AtlasStruct other) {
if (other != null) {
setTypeName(other.getTypeName());
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java b/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
index 5e30554..6f331ea 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
@@ -161,6 +161,86 @@ public class AtlasArrayType extends AtlasType {
}
@Override
+ public boolean areEqualValues(Object val1, Object val2) {
+ boolean ret = true;
+
+ if (val1 == null) {
+ ret = isEmptyArrayValue(val2);
+ } else if (val2 == null) {
+ ret = isEmptyArrayValue(val1);
+ } else {
+ if (val1.getClass().isArray() && val2.getClass().isArray()) {
+ int len = Array.getLength(val1);
+
+ if (len != Array.getLength(val2)) {
+ ret = false;
+ } else {
+ for (int i = 0; i < len; i++) {
+ if (!elementType.areEqualValues(Array.get(val1, i), Array.get(val2, i))) {
+ ret = false;
+
+ break;
+ }
+ }
+ }
+ } else if ((val1 instanceof Set) && (val2 instanceof Set)) {
+ Set set1 = (Set) val1;
+ Set set2 = (Set) val2;
+
+ if (set1.size() != set2.size()) {
+ ret = false;
+ } else {
+ for (Object elem1 : set1) {
+ boolean foundInSet2 = false;
+
+ for (Object elem2 : set2) {
+ if (elementType.areEqualValues(elem1, elem2)) {
+ foundInSet2 = true;
+
+ break;
+ }
+ }
+
+ if (!foundInSet2) {
+ ret = false;
+
+ break;
+ }
+ }
+ }
+ } else {
+ List list1 = getListFromValue(val1);
+
+ if (list1 == null) {
+ ret = false;
+ } else {
+ List list2 = getListFromValue(val2);
+
+ if (list2 == null) {
+ ret = false;
+ } else {
+ int len = list1.size();
+
+ if (len != list2.size()) {
+ ret = false;
+ } else {
+ for (int i = 0; i < len; i++) {
+ if (!elementType.areEqualValues(list1.get(i), list2.get(i))) {
+ ret = false;
+
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
public boolean isValidValueForUpdate(Object obj) {
if (obj != null) {
if (obj instanceof List || obj instanceof Set) {
@@ -439,4 +519,50 @@ public class AtlasArrayType extends AtlasType {
return true;
}
+
+ private boolean isEmptyArrayValue(Object val) {
+ if (val == null) {
+ return true;
+ } else if (val instanceof Collection) {
+ return ((Collection) val).isEmpty();
+ } else if (val.getClass().isArray()) {
+ return Array.getLength(val) == 0;
+ } else if (val instanceof String){
+ List list = AtlasType.fromJson(val.toString(), List.class);
+
+ return list == null || list.isEmpty();
+ }
+
+ return false;
+ }
+
+ private List getListFromValue(Object val) {
+ final List ret;
+
+ if (val instanceof List) {
+ ret = (List) val;
+ } else if (val instanceof Collection) {
+ int len = ((Collection) val).size();
+
+ ret = new ArrayList<>(len);
+
+ for (Object elem : ((Collection) val)) {
+ ret.add(elem);
+ }
+ } else if (val.getClass().isArray()) {
+ int len = Array.getLength(val);
+
+ ret = new ArrayList<>(len);
+
+ for (int i = 0; i < len; i++) {
+ ret.add(Array.get(val, i));
+ }
+ } else if (val instanceof String){
+ ret = AtlasType.fromJson(val.toString(), List.class);
+ } else {
+ ret = null;
+ }
+
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
index 1039de6..1e0c9e6 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
@@ -255,6 +255,7 @@ public class AtlasBuiltInTypes {
*/
public static class AtlasFloatType extends AtlasType {
private static final Float DEFAULT_VALUE = 0f;
+ private static final Float FLOAT_EPSILON = 0.00000001f;
public AtlasFloatType() {
super(AtlasBaseTypeDef.ATLAS_TYPE_FLOAT, TypeCategory.PRIMITIVE);
@@ -274,6 +275,32 @@ public class AtlasBuiltInTypes {
return getNormalizedValue(obj) != null;
}
+ public boolean areEqualValues(Object val1, Object val2) {
+ final boolean ret;
+
+ if (val1 == null) {
+ ret = val2 == null;
+ } else if (val2 == null) {
+ ret = false;
+ } else {
+ Float floatVal1 = getNormalizedValue(val1);
+
+ if (floatVal1 == null) {
+ ret = false;
+ } else {
+ Float floatVal2 = getNormalizedValue(val2);
+
+ if (floatVal2 == null) {
+ ret = false;
+ } else {
+ ret = Math.abs(floatVal1 - floatVal2) < FLOAT_EPSILON;
+ }
+ }
+ }
+
+ return ret;
+ }
+
@Override
public Float getNormalizedValue(Object obj) {
if (obj != null) {
@@ -304,6 +331,7 @@ public class AtlasBuiltInTypes {
*/
public static class AtlasDoubleType extends AtlasType {
private static final Double DEFAULT_VALUE = 0d;
+ private static final Double DOUBLE_EPSILON = 0.00000001d;
public AtlasDoubleType() {
super(AtlasBaseTypeDef.ATLAS_TYPE_DOUBLE, TypeCategory.PRIMITIVE);
@@ -323,6 +351,32 @@ public class AtlasBuiltInTypes {
return getNormalizedValue(obj) != null;
}
+ public boolean areEqualValues(Object val1, Object val2) {
+ final boolean ret;
+
+ if (val1 == null) {
+ ret = val2 == null;
+ } else if (val2 == null) {
+ ret = false;
+ } else {
+ Double doubleVal1 = getNormalizedValue(val1);
+
+ if (doubleVal1 == null) {
+ ret = false;
+ } else {
+ Double doubleVal2 = getNormalizedValue(val2);
+
+ if (doubleVal2 == null) {
+ ret = false;
+ } else {
+ ret = Math.abs(doubleVal1 - doubleVal2) < DOUBLE_EPSILON;
+ }
+ }
+ }
+
+ return ret;
+ }
+
@Override
public Double getNormalizedValue(Object obj) {
Double ret;
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/type/AtlasClassificationType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasClassificationType.java b/intg/src/main/java/org/apache/atlas/type/AtlasClassificationType.java
index e39089c..ae0c206 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasClassificationType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasClassificationType.java
@@ -302,6 +302,17 @@ public class AtlasClassificationType extends AtlasStructType {
}
@Override
+ public boolean areEqualValues(Object val1, Object val2) {
+ for (AtlasClassificationType superType : superTypes) {
+ if (!superType.areEqualValues(val1, val2)) {
+ return false;
+ }
+ }
+
+ return super.areEqualValues(val1, val2);
+ }
+
+ @Override
public boolean isValidValueForUpdate(Object obj) {
if (obj != null) {
for (AtlasClassificationType superType : superTypes) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 9af1d65..28054ef 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -276,6 +276,7 @@ public class AtlasEntityType extends AtlasStructType {
return ret;
}
+
@Override
public boolean isValidValue(Object obj) {
if (obj != null) {
@@ -292,6 +293,17 @@ public class AtlasEntityType extends AtlasStructType {
}
@Override
+ public boolean areEqualValues(Object val1, Object val2) {
+ for (AtlasEntityType superType : superTypes) {
+ if (!superType.areEqualValues(val1, val2)) {
+ return false;
+ }
+ }
+
+ return super.areEqualValues(val1, val2);
+ }
+
+ @Override
public boolean isValidValueForUpdate(Object obj) {
if (obj != null) {
for (AtlasEntityType superType : superTypes) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/type/AtlasMapType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasMapType.java b/intg/src/main/java/org/apache/atlas/type/AtlasMapType.java
index 1fdee83..2c5fa56 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasMapType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasMapType.java
@@ -24,6 +24,7 @@ import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Array;
import java.util.*;
@@ -124,6 +125,45 @@ public class AtlasMapType extends AtlasType {
}
@Override
+ public boolean areEqualValues(Object val1, Object val2) {
+ boolean ret = true;
+
+ if (val1 == null) {
+ ret = isEmptyMapValue(val2);
+ } else if (val2 == null) {
+ ret = isEmptyMapValue(val1);
+ } else {
+ Map map1 = getMapFromValue(val1);
+
+ if (map1 == null) {
+ ret = false;
+ } else {
+ Map map2 = getMapFromValue(val2);
+
+ if (map2 == null) {
+ ret = false;
+ } else {
+ int len = map1.size();
+
+ if (len != map2.size()) {
+ ret = false;
+ } else {
+ for (Object key : map1.keySet()) {
+ if (!valueType.areEqualValues(map1.get(key), map2.get(key))) {
+ ret = false;
+
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
public boolean isValidValueForUpdate(Object obj) {
if (obj != null) {
if (obj instanceof Map) {
@@ -303,4 +343,32 @@ public class AtlasMapType extends AtlasType {
return attributeType;
}
}
+
+ private boolean isEmptyMapValue(Object val) {
+ if (val == null) {
+ return true;
+ } else if (val instanceof Map) {
+ return ((Map) val).isEmpty();
+ } else if (val instanceof String) {
+ Map map = AtlasType.fromJson(val.toString(), Map.class);
+
+ return map == null || map.isEmpty();
+ }
+
+ return false;
+ }
+
+ private Map getMapFromValue(Object val) {
+ final Map ret;
+
+ if (val instanceof Map) {
+ ret = (Map) val;
+ } else if (val instanceof String) {
+ ret = AtlasType.fromJson(val.toString(), Map.class);
+ } else {
+ ret = null;
+ }
+
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
index aa26d18..61168f6 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
@@ -20,6 +20,9 @@ package org.apache.atlas.type;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
@@ -30,6 +33,9 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.Objects;
+
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
@@ -58,6 +64,7 @@ public class AtlasRelationshipType extends AtlasStructType {
resolveReferences(typeRegistry);
}
+
public AtlasRelationshipDef getRelationshipDef() { return relationshipDef; }
@Override
@@ -76,19 +83,17 @@ public class AtlasRelationshipType extends AtlasStructType {
if (type1 instanceof AtlasEntityType) {
end1Type = (AtlasEntityType) type1;
-
} else {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END_TYPE, getTypeName(), end1TypeName);
}
if (type2 instanceof AtlasEntityType) {
end2Type = (AtlasEntityType) type2;
-
} else {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END_TYPE, getTypeName(), end2TypeName);
}
- validateAtlasRelationshipDef(this.relationshipDef);
+ validateAtlasRelationshipDef(relationshipDef);
}
@Override
@@ -150,15 +155,46 @@ public class AtlasRelationshipType extends AtlasStructType {
@Override
public boolean isValidValue(Object obj) {
- boolean ret = true;
-
if (obj != null) {
-
- if (obj instanceof AtlasRelationshipType) {
- validateAtlasRelationshipType((AtlasRelationshipType) obj);
+ if (obj instanceof AtlasRelationship) {
+ return validateRelationship((AtlasRelationship) obj);
+ } else {
+ return false;
}
+ }
- ret = super.isValidValue(obj);
+ return true;
+ }
+
+ @Override
+ public boolean areEqualValues(Object val1, Object val2) {
+ final boolean ret;
+
+ if (val1 == null) {
+ ret = val2 == null;
+ } else if (val2 == null) {
+ ret = false;
+ } else {
+ AtlasRelationship rel1 = getRelationshipFromValue(val1);
+
+ if (rel1 == null) {
+ ret = false;
+ } else {
+ AtlasRelationship rel2 = getRelationshipFromValue(val2);
+
+ if (rel2 == null) {
+ ret = false;
+ } else if (!super.areEqualValues(rel1, rel2)) {
+ ret = false;
+ } else {
+ ret = Objects.equals(rel1.getGuid(), rel2.getGuid()) &&
+ Objects.equals(rel1.getEnd1(), rel2.getEnd1()) &&
+ Objects.equals(rel1.getEnd2(), rel2.getEnd2()) &&
+ Objects.equals(rel1.getLabel(), rel2.getLabel()) &&
+ Objects.equals(rel1.getPropagateTags(), rel2.getPropagateTags()) &&
+ Objects.equals(rel1.getStatus(), rel2.getStatus());
+ }
+ }
}
return ret;
@@ -166,14 +202,15 @@ public class AtlasRelationshipType extends AtlasStructType {
@Override
public boolean isValidValueForUpdate(Object obj) {
- boolean ret = true;
-
if (obj != null) {
- validateAtlasRelationshipType((AtlasRelationshipType) obj);
- ret = super.isValidValueForUpdate(obj);
+ if (obj instanceof AtlasRelationship) {
+ return validateRelationship((AtlasRelationship) obj);
+ } else {
+ return false;
+ }
}
- return ret;
+ return true;
}
public AtlasEntityType getEnd1Type() { return end1Type; }
@@ -182,18 +219,18 @@ public class AtlasRelationshipType extends AtlasStructType {
/**
* Validate the fields in the the RelationshipType are consistent with respect to themselves.
- * @param type
+ * @param relationship
* @throws AtlasBaseException
*/
- private boolean validateAtlasRelationshipType(AtlasRelationshipType type) {
- boolean isValid = false;
- try {
- validateAtlasRelationshipDef(type.getRelationshipDef());
- isValid = true;
- } catch (AtlasBaseException abe) {
- LOG.error("Validation error for AtlasRelationshipType", abe);
+ private boolean validateRelationship(AtlasRelationship relationship) {
+ String end1TypeName = relationship.getEnd1() != null ? relationship.getEnd1().getTypeName() : null;
+ String end2TypeName = relationship.getEnd2() != null ? relationship.getEnd2().getTypeName() : null;
+
+ if (StringUtils.isNotEmpty(end1TypeName) && StringUtils.isNotEmpty(end2TypeName)) {
+ return end1Type.isTypeOrSuperTypeOf(end1TypeName) && end2Type.isTypeOrSuperTypeOf(end2TypeName) && super.isValidValue(relationship);
}
- return isValid;
+
+ return false;
}
/**
@@ -297,4 +334,18 @@ public class AtlasRelationshipType extends AtlasStructType {
return ret;
}
+
+ private AtlasRelationship getRelationshipFromValue(Object val) {
+ final AtlasRelationship ret;
+
+ if (val instanceof AtlasRelationship) {
+ ret = (AtlasRelationship) val;
+ } else if (val instanceof Map) {
+ ret = new AtlasRelationship((Map) val);
+ } else {
+ ret = null;
+ }
+
+ return ret;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
index 1c202e7..2f870dc 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
@@ -31,12 +31,7 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_ATTRIBUTE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_INVERSE_REF;
@@ -238,6 +233,51 @@ public class AtlasStructType extends AtlasType {
}
@Override
+ public boolean areEqualValues(Object val1, Object val2) {
+ boolean ret = true;
+
+ if (val1 == null) {
+ ret = val2 == null;
+ } else if (val2 == null) {
+ ret = false;
+ } else {
+ AtlasStruct structVal1 = getStructFromValue(val1);
+
+ if (structVal1 == null) {
+ ret = false;
+ } else {
+ AtlasStruct structVal2 = getStructFromValue(val2);
+
+ if (structVal2 == null) {
+ ret = false;
+ } else if (!StringUtils.equalsIgnoreCase(structVal1.getTypeName(), structVal2.getTypeName())) {
+ ret = false;
+ } else {
+ for (Map.Entry<String, Object> entry : structVal1.getAttributes().entrySet()) {
+ String attrName = entry.getKey();
+ AtlasAttribute attribute = getAttribute(attrName);
+
+ if (attribute == null) { // ignore unknown attribute
+ continue;
+ } else {
+ Object attrValue1 = entry.getValue();
+ Object attrValue2 = structVal2.getAttribute(attrName);
+
+ if (!attribute.getAttributeType().areEqualValues(attrValue1, attrValue2)) {
+ ret = false;
+
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
public boolean isValidValueForUpdate(Object obj) {
if (obj != null) {
Map<String, Object> attributes;
@@ -617,6 +657,28 @@ public class AtlasStructType extends AtlasType {
return Collections.unmodifiableMap(ret);
}
+ private AtlasStruct getStructFromValue(Object val) {
+ final AtlasStruct ret;
+
+ if (val instanceof AtlasStruct) {
+ ret = (AtlasStruct) val;
+ } else if (val instanceof Map) {
+ ret = new AtlasStruct((Map) val);
+ } else if (val instanceof String) {
+ Map map = AtlasType.fromJson(val.toString(), Map.class);
+
+ if (map == null) {
+ ret = null;
+ } else {
+ ret = new AtlasStruct((Map) val);
+ }
+ } else {
+ ret = null;
+ }
+
+ return ret;
+ }
+
public static class AtlasAttribute {
private final AtlasStructType definedInType;
private final AtlasType attributeType;
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/type/AtlasType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasType.java b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
index 47db2c0..b775f7c 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
/**
@@ -75,6 +76,32 @@ public abstract class AtlasType {
public abstract boolean isValidValue(Object obj);
+ public boolean areEqualValues(Object val1, Object val2) {
+ final boolean ret;
+
+ if (val1 == null) {
+ ret = val2 == null;
+ } else if (val2 == null) {
+ ret = false;
+ } else {
+ Object normalizedVal1 = getNormalizedValue(val1);
+
+ if (normalizedVal1 == null) {
+ ret = false;
+ } else {
+ Object normalizedVal2 = getNormalizedValue(val2);
+
+ if (normalizedVal2 == null) {
+ ret = false;
+ } else {
+ ret = Objects.equals(normalizedVal1, normalizedVal2);
+ }
+ }
+ }
+
+ return ret;
+ }
+
public abstract Object getNormalizedValue(Object obj);
public boolean validateValue(Object obj, String objName, List<String> messages) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
index e237e86..27409c7 100644
--- a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
+++ b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
@@ -47,33 +47,18 @@ public class AtlasEntityUtil {
for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
String attrName = attribute.getName();
AtlasType attrType = attribute.getAttributeType();
- Object currValue = attrType.getNormalizedValue(currEntity.getAttribute(attrName));
- Object newValue = attrType.getNormalizedValue(newEntity.getAttribute(attrName));
+ Object currValue = currEntity.getAttribute(attrName);
+ Object newValue = newEntity.getAttribute(attrName);
- if (!Objects.equals(currValue, newValue)) {
+ if (!attrType.areEqualValues(currEntity.getAttribute(attrName), newEntity.getAttribute(attrName))) {
ret = true;
- // for map/list types, treat 'null' same as empty
- if ((currValue == null && newValue != null) || (currValue != null && newValue == null)) {
- if (attrType instanceof AtlasMapType) {
- if (MapUtils.isEmpty((Map) currValue) && MapUtils.isEmpty((Map) newValue)) {
- ret = false;
- }
- } else if (attrType instanceof AtlasArrayType) {
- if (CollectionUtils.isEmpty((Collection) currValue) && CollectionUtils.isEmpty((Collection) newValue)) {
- ret = false;
- }
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("hasAnyAttributeUpdate(guid={}, typeName={}): attribute '{}' is found updated - currentValue={}, newValue={}",
+ currEntity.getGuid(), currEntity.getTypeName(), attrName, currValue, newValue);
}
- if (ret) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("hasAnyAttributeUpdate(guid={}, typeName={}): attribute '{}' is found updated - currentValue={}, newValue={}",
- currEntity.getGuid(), currEntity.getTypeName(), attrName, currValue, newValue);
- }
-
- break;
- }
+ break;
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasRelationshipStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasRelationshipStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasRelationshipStore.java
index 8043760..1561bd5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasRelationshipStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasRelationshipStore.java
@@ -19,6 +19,9 @@ package org.apache.atlas.repository.store.graph;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+
/**
* Persistence/Retrieval API for AtlasRelationship
@@ -45,6 +48,9 @@ public interface AtlasRelationshipStore {
*/
AtlasRelationship getById(String guid) throws AtlasBaseException;
+
+ AtlasEdge getOrCreate(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException;
+
/**
* Retrieve a relationship if it exists or creates a new relationship instance.
* @param relationship relationship instance definition
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
index 4c511c1..1bda8ff 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -20,10 +20,13 @@ package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.v1.model.instance.Referenceable;
@@ -213,8 +216,8 @@ public class AtlasEntityChangeNotifier {
return ret;
}
- private void doFullTextMapping(List<AtlasEntityHeader> atlasEntityHeaders) {
- if (CollectionUtils.isEmpty(atlasEntityHeaders)) {
+ private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) {
+ if (CollectionUtils.isEmpty(entityHeaders)) {
return;
}
@@ -226,18 +229,22 @@ public class AtlasEntityChangeNotifier {
LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping");
}
- for (AtlasEntityHeader atlasEntityHeader : atlasEntityHeaders) {
- String guid = atlasEntityHeader.getGuid();
- AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(guid);
+ for (AtlasEntityHeader entityHeader : entityHeaders) {
+ if(GraphHelper.isInternalType(entityHeader.getTypeName())) {
+ continue;
+ }
+
+ String guid = entityHeader.getGuid();
+ AtlasVertex vertex = AtlasGraphUtilsV1.findByGuid(guid);
- if(atlasVertex == null || GraphHelper.isInternalType(atlasVertex)) {
+ if(vertex == null) {
continue;
}
try {
String fullText = fullTextMapperV2.getIndexTextForEntity(guid);
- GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText);
+ GraphHelper.setProperty(vertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText);
} catch (AtlasBaseException e) {
LOG.error("FullText mapping failed for Vertex[ guid = {} ]", guid, e);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index db7594d..3a6f733 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -40,6 +40,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasEntityUtil;
+import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -57,6 +58,8 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UP
@Component
public class AtlasEntityStoreV1 implements AtlasEntityStore {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
+ private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("store.EntityStore");
+
private final DeleteHandlerV1 deleteHandler;
private final AtlasTypeRegistry typeRegistry;
@@ -167,50 +170,60 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
- // Create/Update entities
- EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate);
+ AtlasPerfTracer perf = null;
- // for existing entities, skip update if incoming entity doesn't have any change
- if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) {
- EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()");
+ }
- List<AtlasEntity> entitiesToSkipUpdate = null;
- for (AtlasEntity entity : context.getUpdatedEntities()) {
- String guid = entity.getGuid();
- AtlasVertex vertex = context.getVertex(guid);
- AtlasEntity entityInStore = entityRetriever.toAtlasEntity(vertex);
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+ try {
+ // Create/Update entities
+ EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate);
- if (!AtlasEntityUtil.hasAnyAttributeUpdate(entityType, entity, entityInStore)) {
- // if classifications are to be replaced as well, then skip updates only when no change in classifications as well
- if (!replaceClassifications || Objects.equals(entity.getClassifications(), entityInStore.getClassifications())) {
- if (entitiesToSkipUpdate == null) {
- entitiesToSkipUpdate = new ArrayList<>();
- }
+ // for existing entities, skip update if incoming entity doesn't have any change
+ if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) {
+ EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
+
+ List<AtlasEntity> entitiesToSkipUpdate = null;
+ for (AtlasEntity entity : context.getUpdatedEntities()) {
+ String guid = entity.getGuid();
+ AtlasVertex vertex = context.getVertex(guid);
+ AtlasEntity entityInStore = entityRetriever.toAtlasEntity(vertex);
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+ if (!AtlasEntityUtil.hasAnyAttributeUpdate(entityType, entity, entityInStore)) {
+ // if classifications are to be replaced as well, then skip updates only when no change in classifications as well
+ if (!replaceClassifications || Objects.equals(entity.getClassifications(), entityInStore.getClassifications())) {
+ if (entitiesToSkipUpdate == null) {
+ entitiesToSkipUpdate = new ArrayList<>();
+ }
- entitiesToSkipUpdate.add(entity);
+ entitiesToSkipUpdate.add(entity);
+ }
}
}
- }
- if (entitiesToSkipUpdate != null) {
- context.getUpdatedEntities().removeAll(entitiesToSkipUpdate);
+ if (entitiesToSkipUpdate != null) {
+ context.getUpdatedEntities().removeAll(entitiesToSkipUpdate);
+ }
}
- }
- EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications);
+ EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications);
- ret.setGuidAssignments(context.getGuidAssignments());
+ ret.setGuidAssignments(context.getGuidAssignments());
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== createOrUpdate()");
- }
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret, entityStream instanceof EntityImportStream);
- // Notify the change listeners
- entityChangeNotifier.onEntitiesMutated(ret, entityStream instanceof EntityImportStream);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== createOrUpdate()");
+ }
- return ret;
- }
+ return ret;
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
@Override
@GraphTransaction
@@ -584,10 +597,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid);
AtlasEntity entity = entityStream.getByGuid(guid);
- if (entity != null) {
-
+ if (entity != null) { // entity would be null if guid is not in the stream but referenced by an entity in the stream
if (vertex != null) {
- // entity would be null if guid is not in the stream but referenced by an entity in the stream
if (!isPartialUpdate) {
graphDiscoverer.validateAndNormalize(entity);
} else {
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
index 1cada0b..7389f49 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
@@ -17,7 +17,6 @@
*/
package org.apache.atlas.repository.store.graph.v1;
-import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
@@ -65,6 +64,7 @@ import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.getTy
@Component
public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipStoreV1.class);
+
private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
private final AtlasTypeRegistry typeRegistry;
@@ -74,9 +74,9 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
@Inject
public AtlasRelationshipStoreV1(AtlasTypeRegistry typeRegistry, DeleteHandlerV1 deleteHandler) {
- this.typeRegistry = typeRegistry;
- this.entityRetriever = new EntityGraphRetriever(typeRegistry);
- this.deleteHandler = deleteHandler;
+ this.typeRegistry = typeRegistry;
+ this.entityRetriever = new EntityGraphRetriever(typeRegistry);
+ this.deleteHandler = deleteHandler;
}
@Override
@@ -86,11 +86,14 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
LOG.debug("==> create({})", relationship);
}
- validateRelationship(relationship);
+ AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+ AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
- AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
- AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
- AtlasRelationship ret = createRelationship(relationship, end1Vertex, end2Vertex);
+ validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes());
+
+ AtlasEdge edge = createRelationship(end1Vertex, end2Vertex, relationship);
+
+ AtlasRelationship ret = edge != null ? entityRetriever.mapEdgeToAtlasRelationship(edge) : null;
if (LOG.isDebugEnabled()) {
LOG.debug("<== create({}): {}", relationship, ret);
@@ -112,10 +115,58 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid);
}
- validateRelationship(relationship);
+ AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
+ String edgeType = AtlasGraphUtilsV1.getTypeName(edge);
+ AtlasVertex end1Vertex = edge.getInVertex();
+ AtlasVertex end2Vertex = edge.getOutVertex();
+
+ // update shouldn't change endType
+ if (StringUtils.isNotEmpty(relationship.getTypeName()) && !StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED, guid, edgeType, relationship.getTypeName());
+ }
+
+ // update shouldn't change ends
+ if (relationship.getEnd1() != null) {
+ String updatedEnd1Guid = relationship.getEnd1().getGuid();
- AtlasRelationship ret = updateRelationship(relationship);
+ if (updatedEnd1Guid == null) {
+ AtlasVertex updatedEnd1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+ updatedEnd1Guid = updatedEnd1Vertex == null ? null : AtlasGraphUtilsV1.getIdFromVertex(updatedEnd1Vertex);
+ }
+
+ if (updatedEnd1Guid != null) {
+ String end1Guid = AtlasGraphUtilsV1.getIdFromVertex(end1Vertex);
+
+ if (!StringUtils.equalsIgnoreCase(relationship.getEnd1().getGuid(), end1Guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, edgeType, guid, end1Guid, relationship.getEnd1().getGuid());
+ }
+ }
+ }
+
+ // update shouldn't change ends
+ if (relationship.getEnd2() != null) {
+ String updatedEnd2Guid = relationship.getEnd2().getGuid();
+
+ if (updatedEnd2Guid == null) {
+ AtlasVertex updatedEnd2Vertex = getVertexFromEndPoint(relationship.getEnd2());
+
+ updatedEnd2Guid = updatedEnd2Vertex == null ? null : AtlasGraphUtilsV1.getIdFromVertex(updatedEnd2Vertex);
+ }
+
+ if (updatedEnd2Guid != null) {
+ String end2Guid = AtlasGraphUtilsV1.getIdFromVertex(end2Vertex);
+
+ if (!StringUtils.equalsIgnoreCase(relationship.getEnd2().getGuid(), end2Guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, AtlasGraphUtilsV1.getTypeName(edge), guid, end2Guid, relationship.getEnd2().getGuid());
+ }
+ }
+ }
+
+
+ validateRelationship(end1Vertex, end2Vertex, edgeType, relationship.getAttributes());
+
+ AtlasRelationship ret = updateRelationship(edge, relationship);
if (LOG.isDebugEnabled()) {
LOG.debug("<== update({}): {}", relationship, ret);
@@ -172,6 +223,20 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
}
}
+ @Override
+ public AtlasEdge getOrCreate(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
+ AtlasEdge ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
+
+ if (ret == null) {
+ validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes());
+
+ ret = createRelationship(end1Vertex, end2Vertex, relationship);
+ }
+
+ return ret;
+ }
+
+ @Override
public AtlasRelationship getOrCreate(AtlasRelationship relationship) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> getOrCreate({})", relationship);
@@ -181,17 +246,19 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
- AtlasRelationship ret;
+ AtlasRelationship ret = null;
// check if relationship exists
- AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship);
+ AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
- if (relationshipEdge != null) {
- ret = entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
-
- } else {
+ if (relationshipEdge == null) {
validateRelationship(relationship);
- ret = createRelationship(relationship, end1Vertex, end2Vertex);
+
+ relationshipEdge = createRelationship(end1Vertex, end2Vertex, relationship);
+ }
+
+ if (relationshipEdge != null){
+ ret = entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
}
if (LOG.isDebugEnabled()) {
@@ -201,15 +268,14 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
return ret;
}
- private AtlasRelationship createRelationship(AtlasRelationship relationship, AtlasVertex end1Vertex, AtlasVertex end2Vertex)
- throws AtlasBaseException {
- AtlasRelationship ret;
+ private AtlasEdge createRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
+ AtlasEdge ret = null;
try {
- AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship);
+ ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
- if (relationshipEdge == null) {
- relationshipEdge = createRelationshipEdge(end1Vertex, end2Vertex, relationship);
+ if (ret == null) {
+ ret = createRelationshipEdge(end1Vertex, end2Vertex, relationship);
AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
@@ -219,15 +285,12 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
String attrVertexProperty = attr.getVertexPropertyName();
Object attrValue = relationship.getAttribute(attrName);
- AtlasGraphUtilsV1.setProperty(relationshipEdge, attrVertexProperty, attrValue);
+ AtlasGraphUtilsV1.setProperty(ret, attrVertexProperty, attrValue);
}
}
-
- ret = entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
-
} else {
throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS, relationship.getTypeName(),
- relationship.getEnd1().getGuid(), relationship.getEnd2().getGuid());
+ AtlasGraphUtilsV1.getIdFromVertex(end1Vertex), AtlasGraphUtilsV1.getIdFromVertex(end2Vertex));
}
} catch (RepositoryException e) {
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
@@ -236,13 +299,7 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
return ret;
}
- private AtlasRelationship updateRelationship(AtlasRelationship relationship) throws AtlasBaseException {
- AtlasEdge relationshipEdge = graphHelper.getEdgeForGUID(relationship.getGuid());
-
- if (relationshipEdge == null) {
- throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, relationship.getGuid());
- }
-
+ private AtlasRelationship updateRelationship(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException {
AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
AtlasGraphUtilsV1.setProperty(relationshipEdge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, relationship.getPropagateTags().name());
@@ -253,9 +310,7 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
String attrVertexProperty = attr.getVertexPropertyName();
if (relationship.hasAttribute(attrName)) {
- Object attrValue = relationship.getAttribute(attrName);
-
- AtlasGraphUtilsV1.setProperty(relationshipEdge, attrVertexProperty, attrValue);
+ AtlasGraphUtilsV1.setProperty(relationshipEdge, attrVertexProperty, relationship.getAttribute(attrName));
}
}
}
@@ -281,18 +336,16 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "end1/end2 is null");
}
- if (!relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName) &&
- !relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
+ boolean validEndTypes = false;
- throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName,
- relationshipType.getEnd2Type().getTypeName(), end1TypeName);
+ if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
+ validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
+ } else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
+ validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
}
- if (!relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName) &&
- !relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName)) {
-
- throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName,
- relationshipType.getEnd1Type().getTypeName(), end2TypeName);
+ if (!validEndTypes) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
}
validateEnds(relationship);
@@ -300,6 +353,40 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
validateAndNormalize(relationship);
}
+ private void validateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName, Map<String, Object> attributes) throws AtlasBaseException {
+ String end1TypeName = AtlasGraphUtilsV1.getTypeName(end1Vertex);
+ String end2TypeName = AtlasGraphUtilsV1.getTypeName(end2Vertex);
+ AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName);
+
+ if (relationshipType == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'");
+ }
+
+ boolean validEndTypes = false;
+
+ if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
+ validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
+ } else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
+ validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
+ }
+
+ if (!validEndTypes) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
+ }
+
+ List<String> messages = new ArrayList<>();
+ AtlasRelationship relationship = new AtlasRelationship(relationshipName, attributes);
+
+ relationshipType.validateValue(relationship, relationshipName, messages);
+
+ if (!messages.isEmpty()) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages);
+ }
+
+ relationshipType.getNormalizedValue(relationship);
+ }
+
+
/**
* Validate the ends of the passed relationship
* @param relationship
@@ -374,8 +461,8 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
type.getNormalizedValue(relationship);
}
- public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) {
- String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship);
+ public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipType) {
+ String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType);
AtlasEdge ret = graphHelper.getEdgeForLabel(fromVertex, relationshipLabel);
if (ret != null) {
@@ -403,7 +490,6 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
if (StringUtils.isNotEmpty(endPoint.getGuid())) {
ret = AtlasGraphUtilsV1.findByGuid(endPoint.getGuid());
-
} else if (StringUtils.isNotEmpty(endPoint.getTypeName()) && MapUtils.isNotEmpty(endPoint.getUniqueAttributes())) {
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(endPoint.getTypeName());
@@ -414,7 +500,7 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
}
private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) throws RepositoryException {
- String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship);
+ String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship.getTypeName());
PropagateTags tagPropagation = getRelationshipTagPropagation(fromVertex, toVertex, relationship);
AtlasEdge ret = graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel);
@@ -453,11 +539,12 @@ public class AtlasRelationshipStoreV1 implements AtlasRelationshipStore {
return ret;
}
- private String getRelationshipEdgeLabel(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) {
+ private String getRelationshipEdgeLabel(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipTypeName) {
if (LOG.isDebugEnabled()) {
- LOG.debug("validateEnds entry relationship:"+relationship);
+ LOG.debug("getRelationshipEdgeLabel({})", relationshipTypeName);
}
- AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+ AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
String ret = relationshipType.getRelationshipDef().getRelationshipLabel();
AtlasRelationshipEndDef endDef1 = relationshipType.getRelationshipDef().getEndDef1();
AtlasRelationshipEndDef endDef2 = relationshipType.getRelationshipDef().getEndDef2();
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index c203ff4..0fd4355 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -1422,25 +1422,7 @@ public class EntityGraphMapper {
private AtlasEdge getOrCreateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName,
Map<String, Object> relationshipAttributes) throws AtlasBaseException {
- AtlasEdge ret = null;
- AtlasObjectId end1 = new AtlasObjectId(getIdFromVertex(end1Vertex), AtlasGraphUtilsV1.getTypeName(end1Vertex));
- AtlasObjectId end2 = new AtlasObjectId(getIdFromVertex(end2Vertex), AtlasGraphUtilsV1.getTypeName(end2Vertex));
-
- AtlasRelationship relationship = relationshipStore.getOrCreate(new AtlasRelationship(relationshipName, end1, end2, relationshipAttributes));
- // return newly created AtlasEdge
- // if multiple edges are returned, compare using guid to pick the right one
- Iterator<AtlasEdge> outEdges = graphHelper.getOutGoingEdgesByLabel(end1Vertex, relationship.getLabel());
-
- while (outEdges.hasNext()) {
- AtlasEdge edge = outEdges.next();
-
- if (getIdFromVertex(end2Vertex).equals(getIdFromVertex(edge.getInVertex()))) {
- ret = edge;
- break;
- }
- }
-
- return ret;
+ return relationshipStore.getOrCreate(end1Vertex, end2Vertex, new AtlasRelationship(relationshipName, relationshipAttributes));
}
private boolean isRelationshipExists(AtlasVertex fromVertex, AtlasVertex toVertex, String edgeLabel) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/ad6b07a9/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java
index df5bd09..0a55d55 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityMutationContext.java
@@ -55,13 +55,15 @@ public class EntityMutationContext {
}
public void addUpdated(String internalGuid, AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) {
- entitiesUpdated.add(entity);
- entityVsType.put(entity.getGuid(), type);
- entityVsVertex.put(entity.getGuid(), atlasVertex);
-
- if (!StringUtils.equals(internalGuid, entity.getGuid())) {
- guidAssignments.put(internalGuid, entity.getGuid());
- entityVsVertex.put(internalGuid, atlasVertex);
+ if (!entityVsVertex.containsKey(internalGuid)) { // if the entity was already created/updated
+ entitiesUpdated.add(entity);
+ entityVsType.put(entity.getGuid(), type);
+ entityVsVertex.put(entity.getGuid(), atlasVertex);
+
+ if (!StringUtils.equals(internalGuid, entity.getGuid())) {
+ guidAssignments.put(internalGuid, entity.getGuid());
+ entityVsVertex.put(internalGuid, atlasVertex);
+ }
}
}