You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/04/05 07:07:29 UTC
[atlas] branch master updated: ATLAS-3054: updated notification
pre-process to handle updates to ownedRef attributes - #3
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 3e4fb5c ATLAS-3054: updated notification pre-process to handle updates to ownedRef attributes - #3
3e4fb5c is described below
commit 3e4fb5cd84262c6c675e434628e607d7afd518f4
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Sun Mar 31 11:37:46 2019 -0700
ATLAS-3054: updated notification pre-process to handle updates to ownedRef attributes - #3
---
.../java/org/apache/atlas/type/AtlasArrayType.java | 344 ++++++++++++---------
.../org/apache/atlas/type/AtlasEntityType.java | 92 +++---
.../apache/atlas/type/AtlasRelationshipType.java | 11 +-
.../org/apache/atlas/type/AtlasStructType.java | 1 +
.../java/org/apache/atlas/type/AtlasTypeUtil.java | 27 ++
.../org/apache/atlas/type/TestAtlasStructType.java | 4 +-
.../converters/AtlasStructFormatConverter.java | 2 +-
.../store/graph/v2/AtlasEntityStream.java | 5 +
.../notification/NotificationHookConsumer.java | 124 +++++---
.../preprocessor/EntityPreprocessor.java | 1 +
.../preprocessor/HivePreprocessor.java | 63 +---
.../preprocessor/PreprocessorContext.java | 186 ++++++++++-
.../preprocessor/RdbmsPreprocessor.java | 16 +-
13 files changed, 549 insertions(+), 327 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java b/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
index 6147eee..656d946 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasArrayType.java
@@ -21,12 +21,14 @@ package org.apache.atlas.type;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -43,33 +45,36 @@ public class AtlasArrayType extends AtlasType {
private final String elementTypeName;
private int minCount;
private int maxCount;
+ private Cardinality cardinality;
private AtlasType elementType;
public AtlasArrayType(AtlasType elementType) {
- this(elementType, COUNT_NOT_SET, COUNT_NOT_SET);
+ this(elementType, COUNT_NOT_SET, COUNT_NOT_SET, Cardinality.LIST);
}
- public AtlasArrayType(AtlasType elementType, int minCount, int maxCount) {
+ public AtlasArrayType(AtlasType elementType, int minCount, int maxCount, Cardinality cardinality) {
super(AtlasBaseTypeDef.getArrayTypeName(elementType.getTypeName()), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE);
this.elementTypeName = elementType.getTypeName();
this.minCount = minCount;
this.maxCount = maxCount;
+ this.cardinality = cardinality;
this.elementType = elementType;
}
public AtlasArrayType(String elementTypeName, AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
- this(elementTypeName, COUNT_NOT_SET, COUNT_NOT_SET, typeRegistry);
+ this(elementTypeName, COUNT_NOT_SET, COUNT_NOT_SET, Cardinality.LIST, typeRegistry);
}
- public AtlasArrayType(String elementTypeName, int minCount, int maxCount, AtlasTypeRegistry typeRegistry)
+ public AtlasArrayType(String elementTypeName, int minCount, int maxCount, Cardinality cardinality, AtlasTypeRegistry typeRegistry)
throws AtlasBaseException {
super(AtlasBaseTypeDef.getArrayTypeName(elementTypeName), TypeCategory.ARRAY, SERVICE_TYPE_ATLAS_CORE);
this.elementTypeName = elementTypeName;
this.minCount = minCount;
this.maxCount = maxCount;
+ this.cardinality = cardinality;
this.resolveReferences(typeRegistry);
}
@@ -90,6 +95,10 @@ public class AtlasArrayType extends AtlasType {
return maxCount;
}
+ public void setCardinality(Cardinality cardinality) { this.cardinality = cardinality; }
+
+ public Cardinality getCardinality() { return cardinality; }
+
public AtlasType getElementType() {
return elementType;
}
@@ -151,79 +160,12 @@ public class AtlasArrayType extends AtlasType {
@Override
public boolean areEqualValues(Object val1, Object val2, Map<String, String> guidAssignments) {
- boolean ret = true;
+ final boolean ret;
- if (val1 == null) {
- ret = val2 == null;
- } else if (val2 == null) {
- ret = false;
+ if (cardinality == Cardinality.SET) {
+ ret = areEqualSets(val1, val2, guidAssignments);
} else {
- if (val1.getClass().isArray() && val2.getClass().isArray()) {
- int len = Array.getLength(val1);
-
- if (len != Array.getLength(val2)) {
- ret = false;
- } else {
- for (int i = 0; i < len; i++) {
- if (!elementType.areEqualValues(Array.get(val1, i), Array.get(val2, i), guidAssignments)) {
- ret = false;
-
- break;
- }
- }
- }
- } else if ((val1 instanceof Set) && (val2 instanceof Set)) {
- Set set1 = (Set) val1;
- Set set2 = (Set) val2;
-
- if (set1.size() != set2.size()) {
- ret = false;
- } else {
- for (Object elem1 : set1) {
- boolean foundInSet2 = false;
-
- for (Object elem2 : set2) {
- if (elementType.areEqualValues(elem1, elem2, guidAssignments)) {
- foundInSet2 = true;
-
- break;
- }
- }
-
- if (!foundInSet2) {
- ret = false;
-
- break;
- }
- }
- }
- } else {
- List list1 = getListFromValue(val1);
-
- if (list1 == null) {
- ret = false;
- } else {
- List list2 = getListFromValue(val2);
-
- if (list2 == null) {
- ret = false;
- } else {
- int len = list1.size();
-
- if (len != list2.size()) {
- ret = false;
- } else {
- for (int i = 0; i < len; i++) {
- if (!elementType.areEqualValues(list1.get(i), list2.get(i), guidAssignments)) {
- ret = false;
-
- break;
- }
- }
- }
- }
- }
- }
+ ret = areEqualLists(val1, val2, guidAssignments);
}
return ret;
@@ -266,128 +208,120 @@ public class AtlasArrayType extends AtlasType {
@Override
public Collection<?> getNormalizedValue(Object obj) {
- if (obj == null) {
- return null;
- }
+ Collection<Object> ret = null;
- if (obj instanceof String){
- obj = AtlasType.fromJson(obj.toString(), List.class);
+ if (obj instanceof String) {
+ obj = AtlasType.fromJson(obj.toString(), List.class);
}
if (obj instanceof List || obj instanceof Set) {
- List<Object> ret = new ArrayList<>();
+ Collection collObj = (Collection) obj;
- Collection objList = (Collection) obj;
+ if (isValidElementCount(collObj.size())) {
+ ret = new ArrayList<>(collObj.size());
- if (!isValidElementCount(objList.size())) {
- return null;
- }
+ for (Object element : collObj) {
+ if (element != null) {
+ Object normalizedValue = elementType.getNormalizedValue(element);
- for (Object element : objList) {
- if (element != null) {
- Object normalizedValue = elementType.getNormalizedValue(element);
+ if (normalizedValue != null) {
+ ret.add(normalizedValue);
+ } else {
+ ret = null; // invalid element value
- if (normalizedValue != null) {
- ret.add(normalizedValue);
+ break;
+ }
} else {
- return null; // invalid element value
+ ret.add(element);
}
- } else {
- ret.add(element);
}
}
-
- return ret;
- } else if (obj.getClass().isArray()) {
- List<Object> ret = new ArrayList<>();
-
+ } else if (obj != null && obj.getClass().isArray()) {
int arrayLen = Array.getLength(obj);
- if (!isValidElementCount(arrayLen)) {
- return null;
- }
+ if (isValidElementCount(arrayLen)) {
+ ret = new ArrayList<>(arrayLen);
- for (int i = 0; i < arrayLen; i++) {
- Object element = Array.get(obj, i);
+ for (int i = 0; i < arrayLen; i++) {
+ Object element = Array.get(obj, i);
- if (element != null) {
- Object normalizedValue = elementType.getNormalizedValue(element);
+ if (element != null) {
+ Object normalizedValue = elementType.getNormalizedValue(element);
+
+ if (normalizedValue != null) {
+ ret.add(normalizedValue);
+ } else {
+ ret = null; // invalid element value
- if (normalizedValue != null) {
- ret.add(normalizedValue);
+ break;
+ }
} else {
- return null; // invalid element value
+ ret.add(element);
}
- } else {
- ret.add(element);
}
}
-
- return ret;
}
- return null;
+ return ret;
}
@Override
public Collection<?> getNormalizedValueForUpdate(Object obj) {
- if (obj == null) {
- return null;
+ Collection<Object> ret = null;
+
+ if (obj instanceof String) {
+ obj = AtlasType.fromJson(obj.toString(), List.class);
}
if (obj instanceof List || obj instanceof Set) {
- List<Object> ret = new ArrayList<>();
-
Collection objList = (Collection) obj;
- if (!isValidElementCount(objList.size())) {
- return null;
- }
+ if (isValidElementCount(objList.size())) {
+ ret = new ArrayList<>(objList.size());
- for (Object element : objList) {
- if (element != null) {
- Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
+ for (Object element : objList) {
+ if (element != null) {
+ Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
- if (normalizedValue != null) {
- ret.add(normalizedValue);
+ if (normalizedValue != null) {
+ ret.add(normalizedValue);
+ } else {
+ ret = null; // invalid element value
+
+ break;
+ }
} else {
- return null; // invalid element value
+ ret.add(element);
}
- } else {
- ret.add(element);
}
}
-
- return ret;
- } else if (obj.getClass().isArray()) {
- List<Object> ret = new ArrayList<>();
-
+ } else if (obj != null && obj.getClass().isArray()) {
int arrayLen = Array.getLength(obj);
- if (!isValidElementCount(arrayLen)) {
- return null;
- }
+ if (isValidElementCount(arrayLen)) {
+ ret = new ArrayList<>(arrayLen);
- for (int i = 0; i < arrayLen; i++) {
- Object element = Array.get(obj, i);
+ for (int i = 0; i < arrayLen; i++) {
+ Object element = Array.get(obj, i);
- if (element != null) {
- Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
+ if (element != null) {
+ Object normalizedValue = elementType.getNormalizedValueForUpdate(element);
+
+ if (normalizedValue != null) {
+ ret.add(normalizedValue);
+ } else {
+ ret = null; // invalid element value
- if (normalizedValue != null) {
- ret.add(normalizedValue);
+ break;
+ }
} else {
- return null; // invalid element value
+ ret.add(element);
}
- } else {
- ret.add(element);
}
}
-
- return ret;
}
- return null;
+ return ret;
}
@Override
@@ -483,7 +417,7 @@ public class AtlasArrayType extends AtlasType {
if (elementAttributeType == elementType) {
return this;
} else {
- AtlasType attributeType = new AtlasArrayType(elementAttributeType, minCount, maxCount);
+ AtlasType attributeType = new AtlasArrayType(elementAttributeType, minCount, maxCount, cardinality);
if (LOG.isDebugEnabled()) {
LOG.debug("getTypeForAttribute(): {} ==> {}", getTypeName(), attributeType.getTypeName());
@@ -523,29 +457,131 @@ public class AtlasArrayType extends AtlasType {
return false;
}
+ private boolean areEqualSets(Object val1, Object val2, Map<String, String> guidAssignments) {
+ boolean ret = true;
+
+ if (val1 == null) {
+ ret = val2 == null;
+ } else if (val2 == null) {
+ ret = false;
+ } else if (val1 == val2) {
+ ret = true;
+ } else {
+ Set set1 = getSetFromValue(val1);
+ Set set2 = getSetFromValue(val2);
+
+ if (set1.size() != set2.size()) {
+ ret = false;
+ } else {
+ for (Object elem1 : set1) {
+ boolean foundInSet2 = false;
+
+ for (Object elem2 : set2) {
+ if (elementType.areEqualValues(elem1, elem2, guidAssignments)) {
+ foundInSet2 = true;
+
+ break;
+ }
+ }
+
+ if (!foundInSet2) {
+ ret = false;
+
+ break;
+ }
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ private boolean areEqualLists(Object val1, Object val2, Map<String, String> guidAssignments) {
+ boolean ret = true;
+
+ if (val1 == null) {
+ ret = val2 == null;
+ } else if (val2 == null) {
+ ret = false;
+ } else if (val1 == val2) {
+ ret = true;
+ } else if (val1.getClass().isArray() && val2.getClass().isArray()) {
+ int len = Array.getLength(val1);
+
+ if (len != Array.getLength(val2)) {
+ ret = false;
+ } else {
+ for (int i = 0; i < len; i++) {
+ if (!elementType.areEqualValues(Array.get(val1, i), Array.get(val2, i), guidAssignments)) {
+ ret = false;
+
+ break;
+ }
+ }
+ }
+ } else {
+ List list1 = getListFromValue(val1);
+ List list2 = getListFromValue(val2);
+
+ if (list1.size() != list2.size()) {
+ ret = false;
+ } else {
+ int len = list1.size();
+
+ for (int i = 0; i < len; i++) {
+ if (!elementType.areEqualValues(list1.get(i), list2.get(i), guidAssignments)) {
+ ret = false;
+
+ break;
+ }
+ }
+ }
+ }
+
+ return ret;
+ }
+
private List getListFromValue(Object val) {
final List ret;
if (val instanceof List) {
ret = (List) val;
} else if (val instanceof Collection) {
- int len = ((Collection) val).size();
+ ret = new ArrayList<>((Collection) val);
+ } else if (val.getClass().isArray()) {
+ int len = Array.getLength(val);
ret = new ArrayList<>(len);
- for (Object elem : ((Collection) val)) {
- ret.add(elem);
+ for (int i = 0; i < len; i++) {
+ ret.add(Array.get(val, i));
}
+ } else if (val instanceof String){
+ ret = AtlasType.fromJson(val.toString(), List.class);
+ } else {
+ ret = null;
+ }
+
+ return ret;
+ }
+
+ private Set getSetFromValue(Object val) {
+ final Set ret;
+
+ if (val instanceof Set) {
+ ret = (Set) val;
+ } else if (val instanceof Collection) {
+ ret = new HashSet<>((Collection) val);
} else if (val.getClass().isArray()) {
int len = Array.getLength(val);
- ret = new ArrayList<>(len);
+ ret = new HashSet<>(len);
for (int i = 0; i < len; i++) {
ret.add(Array.get(val, i));
}
} else if (val instanceof String){
- ret = AtlasType.fromJson(val.toString(), List.class);
+ ret = AtlasType.fromJson(val.toString(), Set.class);
} else {
ret = null;
}
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 1ce776e..d9ae9e3 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -22,7 +22,6 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
@@ -544,7 +543,9 @@ public class AtlasEntityType extends AtlasStructType {
superType.normalizeAttributeValues(ent);
}
- normalizeValues(ent);
+ super.normalizeAttributeValues(ent);
+
+ normalizeRelationshipAttributeValues(ent, false);
}
}
@@ -555,6 +556,8 @@ public class AtlasEntityType extends AtlasStructType {
}
super.normalizeAttributeValuesForUpdate(ent);
+
+ normalizeRelationshipAttributeValues(ent, true);
}
}
@@ -565,7 +568,9 @@ public class AtlasEntityType extends AtlasStructType {
superType.normalizeAttributeValues(obj);
}
- normalizeValues(obj);
+ super.normalizeAttributeValues(obj);
+
+ normalizeRelationshipAttributeValues(obj, false);
}
}
@@ -576,6 +581,8 @@ public class AtlasEntityType extends AtlasStructType {
}
super.normalizeAttributeValuesForUpdate(obj);
+
+ normalizeRelationshipAttributeValues(obj, true);
}
}
@@ -743,67 +750,56 @@ public class AtlasEntityType extends AtlasStructType {
return ret;
}
- private void normalizeRelationshipAttributeValues(AtlasStruct obj) {
- if (obj != null && obj instanceof AtlasEntity) {
- AtlasEntity entityObj = (AtlasEntity) obj;
-
+ private void normalizeRelationshipAttributeValues(AtlasEntity entity, boolean isUpdate) {
+ if (entity != null) {
for (String attributeName : relationshipAttributes.keySet()) {
- if (entityObj.hasRelationshipAttribute(attributeName)) {
- Object attributeValue = entityObj.getRelationshipAttribute(attributeName);
- String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
- AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType);
- AtlasAttributeDef attributeDef = attribute.getAttributeDef();
+ if (entity.hasRelationshipAttribute(attributeName)) {
+ Object attributeValue = entity.getRelationshipAttribute(attributeName);
+ String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
+ AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType);
- attributeValue = getNormalizedValue(attributeValue, attributeDef);
+ if (attribute != null) {
+ AtlasType attrType = attribute.getAttributeType();
+
+ if (isValidRelationshipType(attrType)) {
+ if (isUpdate) {
+ attributeValue = attrType.getNormalizedValueForUpdate(attributeValue);
+ } else {
+ attributeValue = attrType.getNormalizedValue(attributeValue);
+ }
- entityObj.setRelationshipAttribute(attributeName, attributeValue);
+ entity.setRelationshipAttribute(attributeName, attributeValue);
+ }
+ }
}
}
}
}
- public void normalizeRelationshipAttributeValues(Map<String, Object> obj) {
+ public void normalizeRelationshipAttributeValues(Map<String, Object> obj, boolean isUpdate) {
if (obj != null) {
for (String attributeName : relationshipAttributes.keySet()) {
if (obj.containsKey(attributeName)) {
- Object attributeValue = obj.get(attributeName);
- String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
- AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType);
- AtlasAttributeDef attributeDef = attribute.getAttributeDef();
-
- attributeValue = getNormalizedValue(attributeValue, attributeDef);
-
- obj.put(attributeName, attributeValue);
- }
- }
- }
- }
+ Object attributeValue = obj.get(attributeName);
+ String relationshipType = AtlasEntityUtil.getRelationshipType(attributeValue);
+ AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType);
- private Object getNormalizedValue(Object value, AtlasAttributeDef attributeDef) {
- String relationshipType = AtlasEntityUtil.getRelationshipType(value);
- AtlasAttribute attribute = getRelationshipAttribute(attributeDef.getName(), relationshipType);
+ if (attribute != null) {
+ AtlasType attrType = attribute.getAttributeType();
- if (attribute != null) {
- AtlasType attrType = attribute.getAttributeType();
+ if (isValidRelationshipType(attrType)) {
+ if (isUpdate) {
+ attributeValue = attrType.getNormalizedValueForUpdate(attributeValue);
+ } else {
+ attributeValue = attrType.getNormalizedValue(attributeValue);
+ }
- if (isValidRelationshipType(attrType) && value != null) {
- return attrType.getNormalizedValue(value);
+ obj.put(attributeName, attributeValue);
+ }
+ }
+ }
}
}
-
- return null;
- }
-
- private void normalizeValues(AtlasEntity ent) {
- super.normalizeAttributeValues(ent);
-
- normalizeRelationshipAttributeValues(ent);
- }
-
- private void normalizeValues(Map<String, Object> obj) {
- super.normalizeAttributeValues(obj);
-
- normalizeRelationshipAttributeValues(obj);
}
private boolean validateRelationshipAttributes(Object obj, String objName, List<String> messages) {
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
index 585d176..98071b2 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
@@ -346,8 +346,15 @@ public class AtlasRelationshipType extends AtlasStructType {
attributeDef.addConstraint(constraint);
}
- attribute = new AtlasAttribute(entityType, attributeDef,
- typeRegistry.getType(attrTypeName), getTypeName(), relationshipLabel);
+ AtlasType attrType = typeRegistry.getType(attrTypeName);
+
+ if (attrType instanceof AtlasArrayType) {
+ AtlasArrayType arrayType = (AtlasArrayType) attrType;
+
+ arrayType.setCardinality(attributeDef.getCardinality());
+ }
+
+ attribute = new AtlasAttribute(entityType, attributeDef, attrType, getTypeName(), relationshipLabel);
attribute.setLegacyAttribute(endDef.getIsLegacyAttribute());
} else {
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
index fb24df0..31953bd 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
@@ -99,6 +99,7 @@ public class AtlasStructType extends AtlasType {
arrayType.setMinCount(attributeDef.getValuesMinCount());
arrayType.setMaxCount(attributeDef.getValuesMaxCount());
+ arrayType.setCardinality(cardinality);
}
//check if attribute type is not classification
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
index 079a8fc..d74c7e3 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
@@ -38,6 +38,7 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.model.typedef.AtlasTypeDefHeader;
import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.v1.model.typedef.AttributeDefinition;
import org.apache.atlas.v1.model.typedef.ClassTypeDefinition;
import org.apache.atlas.v1.model.typedef.Multiplicity;
@@ -413,10 +414,36 @@ public class AtlasTypeUtil {
return new AtlasRelatedObjectId(getAtlasObjectId(entity));
}
+ public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) {
+ return new AtlasRelatedObjectId(getAtlasObjectId(entity, typeRegistry));
+ }
+
public static AtlasObjectId getAtlasObjectId(AtlasEntity entity) {
return new AtlasObjectId(entity.getGuid(), entity.getTypeName());
}
+ public static AtlasObjectId getAtlasObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) {
+ String typeName = entity.getTypeName();
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+ Map<String, Object> uniqAttributes = null;
+
+ if (entityType != null && MapUtils.isNotEmpty(entityType.getUniqAttributes())) {
+ for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
+ Object attrValue = entity.getAttribute(attribute.getName());
+
+ if (attrValue != null) {
+ if (uniqAttributes == null) {
+ uniqAttributes = new HashMap<>();
+ }
+
+ uniqAttributes.put(attribute.getName(), attrValue);
+ }
+ }
+ }
+
+ return new AtlasObjectId(entity.getGuid(), typeName, uniqAttributes);
+ }
+
public static AtlasObjectId getAtlasObjectId(AtlasEntityHeader header) {
return new AtlasObjectId(header.getGuid(), header.getTypeName());
}
diff --git a/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java b/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java
index a37dd46..f117fb3 100644
--- a/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java
+++ b/intg/src/test/java/org/apache/atlas/type/TestAtlasStructType.java
@@ -63,12 +63,12 @@ public class TestAtlasStructType {
multiValuedAttribMin.setName(MULTI_VAL_ATTR_NAME_MIN);
multiValuedAttribMin.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT));
- multiValuedAttribMin.setCardinality(Cardinality.SET);
+ multiValuedAttribMin.setCardinality(Cardinality.LIST);
multiValuedAttribMin.setValuesMinCount(MULTI_VAL_ATTR_MIN_COUNT);
multiValuedAttribMax.setName(MULTI_VAL_ATTR_NAME_MAX);
multiValuedAttribMax.setTypeName(AtlasBaseTypeDef.getArrayTypeName(ATLAS_TYPE_INT));
- multiValuedAttribMax.setCardinality(Cardinality.LIST);
+ multiValuedAttribMax.setCardinality(Cardinality.SET);
multiValuedAttribMax.setValuesMaxCount(MULTI_VAL_ATTR_MAX_COUNT);
AtlasStructDef structDef = ModelTestUtil.newStructDef();
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
index 173fcee..ae92b8b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
@@ -190,7 +190,7 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
if (entities != null) {
v2Value = entities;
- attrType = new AtlasArrayType(entityType);
+ attrType = new AtlasArrayType(entityType, arrayType.getMinCount(), arrayType.getMaxCount(), arrayType.getCardinality());
if (LOG.isDebugEnabled()) {
LOG.debug("{}: replaced objIdList with entityList", attr.getQualifiedName());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
index d12b036..c823b20 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
@@ -22,6 +22,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import java.util.Iterator;
+import java.util.List;
public class AtlasEntityStream implements EntityStream {
protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
@@ -33,6 +34,10 @@ public class AtlasEntityStream implements EntityStream {
this(new AtlasEntitiesWithExtInfo(entity), null);
}
+ public AtlasEntityStream(List<AtlasEntity> entities) {
+ this(new AtlasEntitiesWithExtInfo(entities), null);
+ }
+
public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) {
this(new AtlasEntitiesWithExtInfo(entityWithExtInfo), null);
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 48355c9..8430fd4 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -34,6 +34,7 @@ import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification;
@@ -79,7 +80,6 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -492,7 +492,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return;
}
- preProcessNotificationMessage(kafkaMsg);
+ PreprocessorContext context = preProcessNotificationMessage(kafkaMsg);
if (isEmptyMessage(kafkaMsg)) {
commit(kafkaMsg);
@@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
}
- createOrUpdate(entities, false);
+ createOrUpdate(entities, false, context);
}
break;
@@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
- createOrUpdate(entities, true);
+ createOrUpdate(entities, true, context);
}
break;
@@ -579,7 +579,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
- createOrUpdate(entities, false);
+ createOrUpdate(entities, false, context);
}
break;
@@ -593,7 +593,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
}
- createOrUpdate(entities, false);
+ createOrUpdate(entities, false, context);
}
break;
@@ -622,7 +622,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
- createOrUpdate(entities, false);
+ createOrUpdate(entities, false, context);
}
break;
@@ -708,15 +708,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
- private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate) throws AtlasBaseException {
+ private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, PreprocessorContext context) throws AtlasBaseException {
List<AtlasEntity> entitiesList = entities.getEntities();
AtlasEntityStream entityStream = new AtlasEntityStream(entities);
if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
- atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
- } else {
- Map<String, String> guidAssignments = new HashMap<>();
+ EntityMutationResponse response = atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
+ recordProcessedEntities(response, context);
+ } else {
for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
int toIndex = fromIdx + commitBatchSize;
@@ -726,20 +726,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex));
- updateProcessedEntityReferences(entitiesBatch, guidAssignments);
+ updateProcessedEntityReferences(entitiesBatch, context.getGuidAssignments());
AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(entitiesBatch);
AtlasEntityStream batchStream = new AtlasEntityStream(batch, entityStream);
EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
- recordProcessedEntities(response, guidAssignments);
+ recordProcessedEntities(response, context);
RequestContext.get().resetEntityGuidUpdates();
RequestContext.get().clearCache();
}
}
+
+ if (context != null) {
+ context.prepareForPostUpdate();
+
+ List<AtlasEntity> postUpdateEntities = context.getPostUpdateEntities();
+
+ if (CollectionUtils.isNotEmpty(postUpdateEntities)) {
+ atlasEntityStore.createOrUpdate(new AtlasEntityStream(postUpdateEntities), true);
+ }
+ }
}
private void recordFailedMessages() {
@@ -815,49 +825,51 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
- private void preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
- if (!preprocessEnabled) {
- return;
- }
+ private PreprocessorContext preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
+ PreprocessorContext context = null;
- PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
+ if (preprocessEnabled) {
+ context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
- if (context.isHivePreprocessEnabled()) {
- preprocessHiveTypes(context);
- }
+ if (context.isHivePreprocessEnabled()) {
+ preprocessHiveTypes(context);
+ }
- if (skipHiveColumnLineageHive20633) {
- skipHiveColumnLineage(context);
- }
+ if (skipHiveColumnLineageHive20633) {
+ skipHiveColumnLineage(context);
+ }
- if (rdbmsTypesRemoveOwnedRefAttrs) {
- rdbmsTypeRemoveOwnedRefAttrs(context);
- }
+ if (rdbmsTypesRemoveOwnedRefAttrs) {
+ rdbmsTypeRemoveOwnedRefAttrs(context);
+ }
- context.moveRegisteredReferredEntities();
+ context.moveRegisteredReferredEntities();
- if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) {
- // move hive_process and hive_column_lineage entities to end of the list
- List<AtlasEntity> entities = context.getEntities();
- int count = entities.size();
+ if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) {
+ // move hive_process and hive_column_lineage entities to end of the list
+ List<AtlasEntity> entities = context.getEntities();
+ int count = entities.size();
- for (int i = 0; i < count; i++) {
- AtlasEntity entity = entities.get(i);
+ for (int i = 0; i < count; i++) {
+ AtlasEntity entity = entities.get(i);
- switch (entity.getTypeName()) {
- case TYPE_HIVE_PROCESS:
- case TYPE_HIVE_COLUMN_LINEAGE:
- entities.remove(i--);
- entities.add(entity);
- count--;
- break;
+ switch (entity.getTypeName()) {
+ case TYPE_HIVE_PROCESS:
+ case TYPE_HIVE_COLUMN_LINEAGE:
+ entities.remove(i--);
+ entities.add(entity);
+ count--;
+ break;
+ }
}
- }
- if (entities.size() - count > 0) {
- LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size());
+ if (entities.size() - count > 0) {
+ LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size());
+ }
}
}
+
+ return context;
}
private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
@@ -1009,12 +1021,26 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return ret;
}
- private void recordProcessedEntities(EntityMutationResponse mutationResponse, Map<String, String> guidAssignments) {
- if (mutationResponse != null && MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
- guidAssignments.putAll(mutationResponse.getGuidAssignments());
+ private void recordProcessedEntities(EntityMutationResponse mutationResponse, PreprocessorContext context) {
+ if (mutationResponse != null && context != null) {
+ if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
+ context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("recordProcessedEntities: added {} guidAssignments. updatedSize={}", mutationResponse.getGuidAssignments().size(), guidAssignments.size());
+ if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) {
+ for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) {
+ if (entity != null && entity.getGuid() != null) {
+ context.getCreatedEntities().add(entity.getGuid());
+ }
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) {
+ for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) {
+ if (entity != null && entity.getGuid() != null) {
+ context.getDeletedEntities().add(entity.getGuid());
+ }
+ }
}
}
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 085e746..9b620dd 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -53,6 +53,7 @@ public abstract class EntityPreprocessor {
public static final String ATTRIBUTE_TABLES = "tables";
public static final String ATTRIBUTE_INDEXES = "indexes";
public static final String ATTRIBUTE_FOREIGN_KEYS = "foreign_keys";
+ public static final String ATTRIBUTE_INSTANCE = "instance";
public static final char QNAME_SEP_CLUSTER_NAME = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.';
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index 0b93658..cc31032 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -18,8 +18,6 @@
package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -27,16 +25,14 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
public class HivePreprocessor {
private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class);
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns";
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys";
+ private static final String RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC = "hive_table_storagedesc";
static class HiveTablePreprocessor extends EntityPreprocessor {
public HiveTablePreprocessor() {
@@ -67,63 +63,12 @@ public class HivePreprocessor {
entity.setAttribute(ATTRIBUTE_COLUMNS, null);
entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
} else if (context.getHiveTypesRemoveOwnedRefAttrs()) {
- context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD);
-
- removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, context);
- removeColumnsAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, context);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_SD, RELATIONSHIP_TYPE_HIVE_TABLE_STORAGEDESC, ATTRIBUTE_TABLE);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, ATTRIBUTE_TABLE);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, ATTRIBUTE_TABLE);
}
}
}
-
- private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) {
- Object attrVal = entity.removeAttribute(attrName);
-
- if (attrVal != null) {
- Set<String> guids = new HashSet<>();
-
- context.collectGuids(attrVal, guids);
-
- for (String guid : guids) {
- AtlasEntity colEntity = context.getEntity(guid);
-
- if (colEntity != null) {
- Object attrTable = null;
-
- if (colEntity.hasRelationshipAttribute(ATTRIBUTE_TABLE)) {
- attrTable = colEntity.getRelationshipAttribute(ATTRIBUTE_TABLE);
- } else if (colEntity.hasAttribute(ATTRIBUTE_TABLE)) {
- attrTable = colEntity.getAttribute(ATTRIBUTE_TABLE);
- }
-
- attrTable = setRelationshipType(attrTable, relationshipType);
-
- if (attrTable != null) {
- colEntity.setRelationshipAttribute(ATTRIBUTE_TABLE, attrTable);
- }
-
- context.addToReferredEntitiesToMove(guid);
- }
- }
- }
- }
-
- private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
- AtlasRelatedObjectId ret = null;
-
- if (attr instanceof AtlasRelatedObjectId) {
- ret = (AtlasRelatedObjectId) attr;
- } else if (attr instanceof AtlasObjectId) {
- ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
- } else if (attr instanceof Map) {
- ret = new AtlasRelatedObjectId((Map) attr);
- }
-
- if (ret != null) {
- ret.setRelationshipType(relationshipType);
- }
-
- return ret;
- }
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index c85c1b8..2db0574 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -21,18 +21,27 @@ import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
+
public class PreprocessorContext {
private static final Logger LOG = LoggerFactory.getLogger(PreprocessorContext.class);
@@ -40,6 +49,7 @@ public class PreprocessorContext {
public enum PreprocessAction { NONE, IGNORE, PRUNE }
private final AtlasKafkaMessage<HookNotification> kafkaMessage;
+ private final AtlasTypeRegistry typeRegistry;
private final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
private final List<Pattern> hiveTablesToIgnore;
private final List<Pattern> hiveTablesToPrune;
@@ -49,9 +59,14 @@ public class PreprocessorContext {
private final Set<String> ignoredEntities = new HashSet<>();
private final Set<String> prunedEntities = new HashSet<>();
private final Set<String> referredEntitiesToMove = new HashSet<>();
+ private final Set<String> createdEntities = new HashSet<>();
+ private final Set<String> deletedEntities = new HashSet<>();
+ private final Map<String, String> guidAssignments = new HashMap<>();
+ private List<AtlasEntity> postUpdateEntities = null;
- public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
+ public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
this.kafkaMessage = kafkaMessage;
+ this.typeRegistry = typeRegistry;
this.hiveTablesToIgnore = hiveTablesToIgnore;
this.hiveTablesToPrune = hiveTablesToPrune;
this.hiveTablesCache = hiveTablesCache;
@@ -119,6 +134,14 @@ public class PreprocessorContext {
public Set<String> getReferredEntitiesToMove() { return referredEntitiesToMove; }
+ public Set<String> getCreatedEntities() { return createdEntities; }
+
+ public Set<String> getDeletedEntities() { return deletedEntities; }
+
+ public Map<String, String> getGuidAssignments() { return guidAssignments; }
+
+ public List<AtlasEntity> getPostUpdateEntities() { return postUpdateEntities; }
+
public PreprocessAction getPreprocessActionForHiveTable(String qualifiedName) {
PreprocessAction ret = PreprocessAction.NONE;
@@ -199,12 +222,48 @@ public class PreprocessorContext {
collectGuids(obj, prunedEntities);
}
- public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName) {
- Set<String> guidsToMove = new HashSet<>();
+ public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, String refAttrName) {
+ Object attrVal = entity.removeAttribute(attrName);
+
+ if (attrVal != null) {
+ AtlasRelatedObjectId entityId = null;
+ Set<String> guids = new HashSet<>();
+
+ collectGuids(attrVal, guids);
+
+ // removed attrVal might have elements removed (e.g. removed column); to handle this case register the entity for partial update
+ addToPostUpdate(entity, attrName, attrVal);
+
+ for (String guid : guids) {
+ AtlasEntity refEntity = getEntity(guid);
+
+ if (refEntity != null) {
+ Object refAttr = null;
+
+ if (refEntity.hasRelationshipAttribute(refAttrName)) {
+ refAttr = refEntity.getRelationshipAttribute(refAttrName);
+ } else if (refEntity.hasAttribute(refAttrName)) {
+ refAttr = refEntity.getAttribute(refAttrName);
+ } else {
+ if (entityId == null) {
+ entityId = AtlasTypeUtil.toAtlasRelatedObjectId(entity, typeRegistry);
+ }
+
+ refAttr = entityId;
+ }
+
+ if (refAttr != null) {
+ refAttr = setRelationshipType(refAttr, relationshipType);
+ }
- collectGuids(entity.removeAttribute(attrName), guidsToMove);
+ if (refAttr != null) {
+ refEntity.setRelationshipAttribute(refAttrName, refAttr);
+ }
- addToReferredEntitiesToMove(guidsToMove);
+ addToReferredEntitiesToMove(guid);
+ }
+ }
+ }
}
public void moveRegisteredReferredEntities() {
@@ -236,6 +295,32 @@ public class PreprocessorContext {
}
}
+ public void prepareForPostUpdate() {
+ if (postUpdateEntities != null) {
+ ListIterator<AtlasEntity> iter = postUpdateEntities.listIterator();
+
+ while (iter.hasNext()) {
+ AtlasEntity entity = iter.next();
+ String assignedGuid = getAssignedGuid(entity.getGuid());
+
+ // no need to perform partial-update for entities that are created/deleted while processing this message
+ if (createdEntities.contains(assignedGuid) || deletedEntities.contains(assignedGuid)) {
+ iter.remove();
+ } else {
+ entity.setGuid(assignedGuid);
+
+ if (entity.getAttributes() != null) {
+ setAssignedGuids(entity.getAttributes().values());
+ }
+
+ if (entity.getRelationshipAttributes() != null) {
+ setAssignedGuids(entity.getRelationshipAttributes().values());
+ }
+ }
+ }
+ }
+ }
+
public String getTypeName(Object obj) {
Object ret = null;
@@ -258,7 +343,7 @@ public class PreprocessorContext {
if (obj instanceof AtlasObjectId) {
ret = ((AtlasObjectId) obj).getGuid();
} else if (obj instanceof Map) {
- ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
+ ret = ((Map) obj).get(KEY_GUID);
} else if (obj instanceof AtlasEntity) {
ret = ((AtlasEntity) obj).getGuid();
} else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
@@ -274,7 +359,7 @@ public class PreprocessorContext {
Collection objList = (Collection) obj;
for (Object objElem : objList) {
- collectGuid(objElem, guids);
+ collectGuids(objElem, guids);
}
} else {
collectGuid(obj, guids);
@@ -304,4 +389,91 @@ public class PreprocessorContext {
return ret;
}
+
+ private AtlasRelatedObjectId setRelationshipType(Object attr, String relationshipType) {
+ AtlasRelatedObjectId ret = null;
+
+ if (attr instanceof AtlasRelatedObjectId) {
+ ret = (AtlasRelatedObjectId) attr;
+ } else if (attr instanceof AtlasObjectId) {
+ ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
+ } else if (attr instanceof Map) {
+ ret = new AtlasRelatedObjectId((Map) attr);
+ }
+
+ if (ret != null) {
+ ret.setRelationshipType(relationshipType);
+ }
+
+ return ret;
+ }
+
+ private String getAssignedGuid(String guid) {
+ String ret = guidAssignments.get(guid);
+
+ return ret != null ? ret : guid;
+ }
+
+ private void setAssignedGuids(Object obj) {
+ if (obj != null) {
+ if (obj instanceof Collection) {
+ Collection objList = (Collection) obj;
+
+ for (Object objElem : objList) {
+ setAssignedGuids(objElem);
+ }
+ } else {
+ setAssignedGuid(obj);
+ }
+ }
+ }
+
+ private void setAssignedGuid(Object obj) {
+ if (obj instanceof AtlasRelatedObjectId) {
+ AtlasRelatedObjectId objId = (AtlasRelatedObjectId) obj;
+
+ objId.setGuid(getAssignedGuid(objId.getGuid()));
+ } else if (obj instanceof AtlasObjectId) {
+ AtlasObjectId objId = (AtlasObjectId) obj;
+
+ objId.setGuid(getAssignedGuid(objId.getGuid()));
+ } else if (obj instanceof Map) {
+ Map objId = (Map) obj;
+ Object guid = objId.get(KEY_GUID);
+
+ if (guid != null) {
+ objId.put(KEY_GUID, getAssignedGuid(guid.toString()));
+ }
+ }
+ }
+
+ private void addToPostUpdate(AtlasEntity entity, String attrName, Object attrVal) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addToPostUpdate(guid={}, entityType={}, attrName={}", entity.getGuid(), entity.getTypeName(), attrName);
+ }
+
+ AtlasEntity partialEntity = null;
+
+ if (postUpdateEntities == null) {
+ postUpdateEntities = new ArrayList<>();
+ }
+
+ for (AtlasEntity existing : postUpdateEntities) {
+ if (StringUtils.equals(entity.getGuid(), existing.getGuid())) {
+ partialEntity = existing;
+
+ break;
+ }
+ }
+
+ if (partialEntity == null) {
+ partialEntity = new AtlasEntity(entity.getTypeName(), attrName, attrVal);
+
+ partialEntity.setGuid(entity.getGuid());
+
+ postUpdateEntities.add(partialEntity);
+ } else {
+ partialEntity.setAttribute(attrName, attrVal);
+ }
+ }
}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
index adc1983..7dcfa2f 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
@@ -32,6 +32,12 @@ import java.util.Set;
public class RdbmsPreprocessor {
private static final Logger LOG = LoggerFactory.getLogger(RdbmsPreprocessor.class);
+ private static final String RELATIONSHIP_TYPE_RDBMS_INSTANCE_DATABASES = "rdbms_instance_databases";
+ private static final String RELATIONSHIP_TYPE_RDBMS_DB_TABLES = "rdbms_db_tables";
+ private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_COLUMNS = "rdbms_table_columns";
+ private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_INDEXES = "rdbms_table_indexes";
+ private static final String RELATIONSHIP_TYPE_RDBMS_TABLE_FOREIGN_KEYS = "rdbms_table_foreign_key";
+
static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor {
public RdbmsInstancePreprocessor() {
super(TYPE_RDBMS_INSTANCE);
@@ -121,17 +127,17 @@ public class RdbmsPreprocessor {
private void clearRefAttributesAndMove(AtlasEntity entity, PreprocessorContext context) {
switch (entity.getTypeName()) {
case TYPE_RDBMS_INSTANCE:
- context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_DATABASES, RELATIONSHIP_TYPE_RDBMS_INSTANCE_DATABASES, ATTRIBUTE_INSTANCE);
break;
case TYPE_RDBMS_DB:
- context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_TABLES, RELATIONSHIP_TYPE_RDBMS_DB_TABLES, ATTRIBUTE_DB);
break;
case TYPE_RDBMS_TABLE:
- context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS);
- context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES);
- context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_RDBMS_TABLE_COLUMNS, ATTRIBUTE_TABLE);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_INDEXES, RELATIONSHIP_TYPE_RDBMS_TABLE_INDEXES, ATTRIBUTE_TABLE);
+ context.removeRefAttributeAndRegisterToMove(entity, ATTRIBUTE_FOREIGN_KEYS, RELATIONSHIP_TYPE_RDBMS_TABLE_FOREIGN_KEYS, ATTRIBUTE_TABLE);
break;
}
}