You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/03/12 21:29:53 UTC
[atlas] branch master updated: ATLAS-3054: improved batch
processing in notificaiton handler to avoid processing of an entity
multiple times - #2
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new fc2a926 ATLAS-3054: improved batch processing in notificaiton handler to avoid processing of an entity multiple times - #2
fc2a926 is described below
commit fc2a926cc7d0f0070c25f9afc68bf6a5a1bb6df2
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Mon Mar 11 16:17:03 2019 -0700
ATLAS-3054: improved batch processing in notificaiton handler to avoid processing of an entity multiple times - #2
---
addons/models/1000-Hadoop/1030-hive_model.json | 2 +-
.../org/apache/atlas/type/AtlasBuiltInTypes.java | 13 +-
.../org/apache/atlas/type/AtlasEntityType.java | 4 +-
.../apache/atlas/type/AtlasRelationshipType.java | 7 +-
.../org/apache/atlas/type/AtlasStructType.java | 37 ++++-
.../org/apache/atlas/utils/AtlasEntityUtil.java | 33 ++++-
.../atlas/discovery/EntityDiscoveryService.java | 19 +--
.../converters/AtlasStructFormatConverter.java | 9 +-
.../store/graph/v2/AtlasEntityStoreV2.java | 2 +-
.../store/graph/v2/EntityGraphMapper.java | 79 ++++++++---
.../notification/NotificationHookConsumer.java | 153 ++++++++++++++++++++-
.../preprocessor/HivePreprocessor.java | 4 +-
.../preprocessor/PreprocessorContext.java | 4 +
13 files changed, 305 insertions(+), 61 deletions(-)
diff --git a/addons/models/1000-Hadoop/1030-hive_model.json b/addons/models/1000-Hadoop/1030-hive_model.json
index 324d716..7207a41 100644
--- a/addons/models/1000-Hadoop/1030-hive_model.json
+++ b/addons/models/1000-Hadoop/1030-hive_model.json
@@ -508,7 +508,7 @@
"serviceType": "hive",
"typeVersion": "1.2",
"relationshipCategory": "COMPOSITION",
- "relationshipLabel": "__hive_table.partitionkeys",
+ "relationshipLabel": "__hive_table.partitionKeys",
"endDef1": {
"type": "hive_table",
"name": "partitionKeys",
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
index 6bedf6d..ce14b5b 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
@@ -20,6 +20,7 @@ package org.apache.atlas.type;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
@@ -672,19 +673,25 @@ public class AtlasBuiltInTypes {
@Override
public AtlasObjectId getNormalizedValue(Object obj) {
+ AtlasObjectId ret = null;
+
if (obj != null) {
if (obj instanceof AtlasObjectId) {
- return (AtlasObjectId) obj;
+ ret = (AtlasObjectId) obj;
} else if (obj instanceof Map) {
Map map = (Map) obj;
if (isValidMap(map)) {
- return new AtlasObjectId(map);
+ if (map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
+ ret = new AtlasRelatedObjectId(map);
+ } else {
+ ret = new AtlasObjectId(map);
+ }
}
}
}
- return null;
+ return ret;
}
private boolean isValidMap(Map map) {
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 2557bb3..b5360c1 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -799,7 +799,7 @@ public class AtlasEntityType extends AtlasStructType {
AtlasEntity entityObj = (AtlasEntity) obj;
for (String attributeName : relationshipAttributes.keySet()) {
- Object value = entityObj.getAttribute(attributeName);
+ Object value = entityObj.getRelationshipAttribute(attributeName);
String relationshipType = AtlasEntityUtil.getRelationshipType(value);
AtlasAttribute attribute = getRelationshipAttribute(attributeName, relationshipType);
@@ -824,7 +824,7 @@ public class AtlasEntityType extends AtlasStructType {
}
}
} else if (obj instanceof Map) {
- Map attributes = AtlasTypeUtil.toStructAttributes((Map) obj);
+ Map attributes = AtlasTypeUtil.toRelationshipAttributes((Map) obj);
for (String attributeName : relationshipAttributes.keySet()) {
Object value = attributes.get(attributeName);
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
index 3ea8d80..183772b 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
@@ -159,7 +159,9 @@ public class AtlasRelationshipType extends AtlasStructType {
AtlasRelationshipEdgeDirection end2Direction = IN;
if (endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) {
- end2Direction = OUT;
+ if (relationshipDef.getRelationshipLabel() == null) { // only if label hasn't been overridden
+ end2Direction = OUT;
+ }
} else if (!endDef1.getIsLegacyAttribute() && endDef2.getIsLegacyAttribute()) {
end1Direction = IN;
end2Direction = OUT;
@@ -345,11 +347,12 @@ public class AtlasRelationshipType extends AtlasStructType {
}
attribute = new AtlasAttribute(entityType, attributeDef,
- typeRegistry.getType(attrTypeName), relationshipLabel);
+ typeRegistry.getType(attrTypeName), getTypeName(), relationshipLabel);
} else {
// attribute already exists (legacy attribute which is also a relationship attribute)
// add relationshipLabel information to existing attribute
+ attribute.setRelationshipName(getTypeName());
attribute.setRelationshipEdgeLabel(relationshipLabel);
}
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
index 84c76d7..0be7e18 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.*;
+import static org.apache.atlas.model.TypeCategory.*;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_ATTRIBUTE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_INVERSE_REF;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF;
@@ -701,19 +702,23 @@ public class AtlasStructType extends AtlasType {
private final String vertexPropertyName;
private final String vertexUniquePropertyName;
private final boolean isOwnedRef;
+ private final boolean isObjectRef;
private final String inverseRefAttributeName;
private AtlasAttribute inverseRefAttribute;
+ private String relationshipName;
private String relationshipEdgeLabel;
private AtlasRelationshipEdgeDirection relationshipEdgeDirection;
- public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType, String relationshipLabel) {
+ public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType, String relationshipName, String relationshipLabel) {
this.definedInType = definedInType;
this.attributeDef = attrDef;
this.attributeType = attributeType.getTypeForAttribute();
this.qualifiedName = getQualifiedAttributeName(definedInType.getStructDef(), attributeDef.getName());
this.vertexPropertyName = encodePropertyKey(this.qualifiedName);
this.vertexUniquePropertyName = attrDef.getIsUnique() ? encodePropertyKey(getQualifiedAttributeName(definedInType.getStructDef(), UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX + attributeDef.getName())) : null;
+ this.relationshipName = relationshipName;
this.relationshipEdgeLabel = getRelationshipEdgeLabel(relationshipLabel);
+
boolean isOwnedRef = false;
String inverseRefAttribute = null;
@@ -736,10 +741,32 @@ public class AtlasStructType extends AtlasType {
this.isOwnedRef = isOwnedRef;
this.inverseRefAttributeName = inverseRefAttribute;
this.relationshipEdgeDirection = AtlasRelationshipEdgeDirection.OUT;
+
+ switch (attributeType.getTypeCategory()) {
+ case OBJECT_ID_TYPE:
+ isObjectRef = true;
+ break;
+
+ case MAP:
+ AtlasMapType mapType = (AtlasMapType) attributeType;
+
+ isObjectRef = mapType.getValueType().getTypeCategory() == OBJECT_ID_TYPE;
+ break;
+
+ case ARRAY:
+ AtlasArrayType arrayType = (AtlasArrayType) attributeType;
+
+ isObjectRef = arrayType.getElementType().getTypeCategory() == OBJECT_ID_TYPE;
+ break;
+
+ default:
+ isObjectRef = false;
+ break;
+ }
}
public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef attrDef, AtlasType attributeType) {
- this(definedInType, attrDef, attributeType, null);
+ this(definedInType, attrDef, attributeType, null, null);
}
public AtlasStructType getDefinedInType() { return definedInType; }
@@ -766,12 +793,18 @@ public class AtlasStructType extends AtlasType {
public boolean isOwnedRef() { return isOwnedRef; }
+ public boolean isObjectRef() { return isObjectRef; }
+
public String getInverseRefAttributeName() { return inverseRefAttributeName; }
public AtlasAttribute getInverseRefAttribute() { return inverseRefAttribute; }
public void setInverseRefAttribute(AtlasAttribute inverseAttr) { inverseRefAttribute = inverseAttr; }
+ public String getRelationshipName() { return relationshipName; }
+
+ public void setRelationshipName(String relationshipName) { this.relationshipName = relationshipName; }
+
public String getRelationshipEdgeLabel() { return relationshipEdgeLabel; }
public void setRelationshipEdgeLabel(String relationshipEdgeLabel) { this.relationshipEdgeLabel = relationshipEdgeLabel; }
diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
index 3002217..1e78e25 100644
--- a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
+++ b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -126,10 +127,38 @@ public class AtlasEntityUtil {
if (val instanceof AtlasRelatedObjectId) {
ret = ((AtlasRelatedObjectId) val).getRelationshipType();
+ } else if (val instanceof Collection) {
+ String elemRelationshipType = null;
+
+ for (Object elem : (Collection) val) {
+ elemRelationshipType = getRelationshipType(elem);
+
+ if (elemRelationshipType != null) {
+ break;
+ }
+ }
+
+ ret = elemRelationshipType;
} else if (val instanceof Map) {
- Object relTypeName = ((Map) val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE);
+ Map mapValue = (Map) val;
+
+ if (mapValue.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
+ Object relTypeName = ((Map) val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE);
- ret = relTypeName != null ? relTypeName.toString() : null;
+ ret = relTypeName != null ? relTypeName.toString() : null;
+ } else {
+ String entryRelationshipType = null;
+
+ for (Object entryVal : mapValue.values()) {
+ entryRelationshipType = getRelationshipType(entryVal);
+
+ if (entryRelationshipType != null) {
+ break;
+ }
+ }
+
+ ret = entryRelationshipType;
+ }
} else {
ret = null;
}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 9df360c..19f81d3 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -537,7 +537,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
AtlasAttribute attribute = entityType.getAttribute(relation);
if (attribute != null) {
- if (isRelationshipAttribute(attribute)) {
+ if (attribute.isObjectRef()) {
relation = attribute.getRelationshipEdgeLabel();
} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_ATTRIBUTE, relation, attribute.getTypeName());
@@ -790,23 +790,6 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return "";
}
- private boolean isRelationshipAttribute(AtlasAttribute attribute) throws AtlasBaseException {
- boolean ret = true;
- AtlasType attrType = attribute.getAttributeType();
-
- if (attrType.getTypeCategory() == ARRAY) {
- attrType = ((AtlasArrayType) attrType).getElementType();
- } else if (attrType.getTypeCategory() == MAP) {
- attrType = ((AtlasMapType) attrType).getValueType();
- }
-
- if (attrType.getTypeCategory() != OBJECT_ID_TYPE) {
- ret = false;
- }
-
- return ret;
- }
-
private Set<String> getEntityStates() {
return new HashSet<>(Arrays.asList(ACTIVE.toString(), DELETED.toString()));
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
index 51a6426..173fcee 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
@@ -134,11 +135,12 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
// Only process the requested/set attributes
for (String attrName : attributes.keySet()) {
- AtlasAttribute attr = structType.getAttribute(attrName);
+ Object v2Value = attributes.get(attrName);
+ AtlasAttribute attr = structType.getAttribute(attrName);
if (attr == null) {
if (isEntityType) {
- attr = ((AtlasEntityType) structType).getRelationshipAttribute(attrName, null);
+ attr = ((AtlasEntityType) structType).getRelationshipAttribute(attrName, AtlasEntityUtil.getRelationshipType(v2Value));
}
if (attr == null) {
@@ -149,7 +151,6 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
AtlasType attrType = attr.getAttributeType();
AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory());
- Object v2Value = attributes.get(attr.getName());
if (v2Value != null && isEntityType && attr.isOwnedRef()) {
if (LOG.isDebugEnabled()) {
@@ -256,6 +257,7 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
// Only process the requested/set attributes
for (Object attribKey : attributes.keySet()) {
String attrName = attribKey.toString();
+ Object v1Value = attributes.get(attrName);
AtlasAttribute attr = structType.getAttribute(attrName);
if (attr == null) {
@@ -271,7 +273,6 @@ public class AtlasStructFormatConverter extends AtlasAbstractFormatConverter {
AtlasType attrType = attr.getAttributeType();
AtlasFormatConverter attrConverter = converterRegistry.getConverter(attrType.getTypeCategory());
- Object v1Value = attributes.get(attrName);
if (attrConverter.isValidValueV1(v1Value, attrType)) {
Object v2Value = attrConverter.fromV1ToV2(v1Value, attrType, context);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index a62f335..a5a6291 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -323,7 +323,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
AtlasAttribute attr = entityType.getAttribute(attrName);
if (attr == null) {
- attr = entityType.getRelationshipAttribute(attrName, null);
+ attr = entityType.getRelationshipAttribute(attrName, AtlasEntityUtil.getRelationshipType(attrValue));
if (attr == null) {
throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName());
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index a6f1250..31b20ff 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -79,8 +79,43 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DE
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
-import static org.apache.atlas.repository.Constants.*;
-import static org.apache.atlas.repository.graph.GraphHelper.*;
+import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY;
+import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.HOME_ID_KEY;
+import static org.apache.atlas.repository.Constants.IS_PROXY_KEY;
+import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
+import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
+import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
+import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
+import static org.apache.atlas.repository.graph.GraphHelper.getDefaultRemovePropagations;
+import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
+import static org.apache.atlas.repository.graph.GraphHelper.getStatus;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
+import static org.apache.atlas.repository.graph.GraphHelper.isActive;
+import static org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
+import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
+import static org.apache.atlas.repository.graph.GraphHelper.string;
+import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
@@ -827,27 +862,29 @@ public class EntityGraphMapper {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, (ctx.getValue() == null ? null : ctx.getValue().toString()));
}
- String attributeName = ctx.getAttribute().getName();
- AtlasType type = typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex));
-
- AtlasRelationshipEdgeDirection edgeDirection = ctx.getAttribute().getRelationshipEdgeDirection();
+ AtlasType type = typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex));
if (type instanceof AtlasEntityType) {
AtlasEntityType entityType = (AtlasEntityType) type;
+ AtlasAttribute attribute = ctx.getAttribute();
+ String attributeName = attribute.getName();
// use relationship to create/update edges
if (entityType.hasRelationshipAttribute(attributeName)) {
Map<String, Object> relationshipAttributes = getRelationshipAttributes(ctx.getValue());
if (ctx.getCurrentEdge() != null) {
- ret = updateRelationship(ctx.getCurrentEdge(), entityVertex, attributeVertex, edgeDirection, relationshipAttributes);
-
+ ret = updateRelationship(ctx.getCurrentEdge(), entityVertex, attributeVertex, attribute.getRelationshipEdgeDirection(), relationshipAttributes);
} else {
- String relationshipName = graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName);
+ String relationshipName = attribute.getRelationshipName();
AtlasVertex fromVertex;
AtlasVertex toVertex;
- if (edgeDirection == IN) {
+ if (StringUtils.isEmpty(relationshipName)) {
+ relationshipName = graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName);
+ }
+
+ if (attribute.getRelationshipEdgeDirection() == IN) {
fromVertex = attributeVertex;
toVertex = entityVertex;
@@ -1106,21 +1143,29 @@ public class EntityGraphMapper {
}
private static AtlasObjectId getObjectId(Object val) throws AtlasBaseException {
+ AtlasObjectId ret = null;
+
if (val != null) {
if ( val instanceof AtlasObjectId) {
- return ((AtlasObjectId) val);
+ ret = ((AtlasObjectId) val);
} else if (val instanceof Map) {
- AtlasObjectId ret = new AtlasObjectId((Map)val);
+ Map map = (Map) val;
- if (AtlasTypeUtil.isValid(ret)) {
- return ret;
+ if (map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
+ ret = new AtlasRelatedObjectId(map);
+ } else {
+ ret = new AtlasObjectId((Map) val);
}
- }
- throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
+ if (!AtlasTypeUtil.isValid(ret)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
+ }
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
+ }
}
- return null;
+ return ret;
}
private static String getGuid(Object val) throws AtlasBaseException {
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index d16d544..d1d6003 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -35,6 +35,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
@@ -56,12 +57,14 @@ import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.filters.AuditFilter.AuditLog;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
@@ -75,6 +78,7 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -86,6 +90,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
+import static org.apache.atlas.model.instance.AtlasObjectId.*;
+import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS;
+
/**
* Consumer of notifications from hooks e.g., hive hook etc.
@@ -174,7 +181,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default
maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default
- commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 0);
+ commitBatchSize = applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15
@@ -216,8 +223,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveTablesCache = Collections.emptyMap();
}
- hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, false);
- rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, false);
+ hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
+ rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
preprocessEnabled = !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs;
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
@@ -704,6 +711,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
} else {
+ Map<String, String> guidAssignments = new HashMap<>();
+
for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
int toIndex = fromIdx + commitBatchSize;
@@ -711,10 +720,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
toIndex = entitiesList.size();
}
- AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(new ArrayList<>(entitiesList.subList(fromIdx, toIndex)));
+ List<AtlasEntity> entitiesBatch = new ArrayList<>(entitiesList.subList(fromIdx, toIndex));
+
+ updateProcessedEntityReferences(entitiesBatch, guidAssignments);
+
+ AtlasEntitiesWithExtInfo batch = new AtlasEntitiesWithExtInfo(entitiesBatch);
AtlasEntityStream batchStream = new AtlasEntityStream(batch, entityStream);
- atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
+ EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
+
+ recordProcessedEntities(response, guidAssignments);
RequestContext.get().resetEntityGuidUpdates();
@@ -801,7 +816,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
PreprocessorContext context = new PreprocessorContext(kafkaMsg, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
- if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs) {
+ if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
}
@@ -814,6 +829,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
context.moveRegisteredReferredEntities();
+
+ if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities())) {
+ // move hive_process and hive_column_lineage entities to end of the list
+ List<AtlasEntity> entities = context.getEntities();
+ int count = entities.size();
+
+ for (int i = 0; i < count; i++) {
+ AtlasEntity entity = entities.get(i);
+
+ switch (entity.getTypeName()) {
+ case TYPE_HIVE_PROCESS:
+ case TYPE_HIVE_COLUMN_LINEAGE:
+ entities.remove(i--);
+ entities.add(entity);
+ count--;
+ break;
+ }
+ }
+
+ if (entities.size() - count > 0) {
+ LOG.info("moved {} hive_process/hive_column_lineage entities to end of list (listSize={})", entities.size() - count, entities.size());
+ }
+ }
}
private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
@@ -896,7 +934,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (lineageQNames.contains(qualifiedName)) {
entities.remove(i--);
- LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, lineageInputsCount, context.getKafkaMessageOffset(), context.getKafkaPartition());
+ LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition());
numRemovedEntities++;
@@ -965,6 +1003,107 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return ret;
}
+ private void recordProcessedEntities(EntityMutationResponse mutationResponse, Map<String, String> guidAssignments) {
+ if (mutationResponse != null && MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
+ guidAssignments.putAll(mutationResponse.getGuidAssignments());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("recordProcessedEntities: added {} guidAssignments. updatedSize={}", mutationResponse.getGuidAssignments().size(), guidAssignments.size());
+ }
+ }
+ }
+
+ private void updateProcessedEntityReferences(List<AtlasEntity> entities, Map<String, String> guidAssignments) {
+ if (CollectionUtils.isNotEmpty(entities) && MapUtils.isNotEmpty(guidAssignments)) {
+ for (AtlasEntity entity : entities) {
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+ if (entityType == null) {
+ continue;
+ }
+
+ if (MapUtils.isNotEmpty(entity.getAttributes())) {
+ for (Map.Entry<String, Object> entry : entity.getAttributes().entrySet()) {
+ String attrName = entry.getKey();
+ Object attrValue = entry.getValue();
+
+ if (attrValue == null) {
+ continue;
+ }
+
+ AtlasAttribute attribute = entityType.getAttribute(attrName);
+
+ if (attribute == null) { // look for a relationship attribute with the same name
+ attribute = entityType.getRelationshipAttribute(attrName, null);
+ }
+
+ if (attribute != null && attribute.isObjectRef()) {
+ updateProcessedEntityReferences(attrValue, guidAssignments);
+ }
+ }
+ }
+
+ if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
+ for (Map.Entry<String, Object> entry : entity.getRelationshipAttributes().entrySet()) {
+ Object attrValue = entry.getValue();
+
+ if (attrValue != null) {
+ updateProcessedEntityReferences(attrValue, guidAssignments);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void updateProcessedEntityReferences(Object objVal, Map<String, String> guidAssignments) {
+ if (objVal instanceof AtlasObjectId) {
+ updateProcessedEntityReferences((AtlasObjectId) objVal, guidAssignments);
+ } else if (objVal instanceof Collection) {
+ updateProcessedEntityReferences((Collection) objVal, guidAssignments);
+ } else if (objVal instanceof Map) {
+ updateProcessedEntityReferences((Map) objVal, guidAssignments);
+ }
+ }
+
+ private void updateProcessedEntityReferences(AtlasObjectId objId, Map<String, String> guidAssignments) {
+ String guid = objId.getGuid();
+
+ if (guid != null && guidAssignments.containsKey(guid)) {
+ String assignedGuid = guidAssignments.get(guid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", objId.getTypeName(), guid, assignedGuid);
+ }
+
+ objId.setGuid(assignedGuid);
+ objId.setTypeName(null);
+ objId.setUniqueAttributes(null);
+ }
+ }
+
+ private void updateProcessedEntityReferences(Map objId, Map<String, String> guidAssignments) {
+ Object guid = objId.get(KEY_GUID);
+
+ if (guid != null && guidAssignments.containsKey(guid)) {
+ String assignedGuid = guidAssignments.get(guid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", objId.get(KEY_TYPENAME), guid, assignedGuid);
+ }
+
+ objId.put(KEY_GUID, assignedGuid);
+ objId.remove(KEY_TYPENAME);
+ objId.remove(KEY_UNIQUE_ATTRIBUTES);
+ }
+ }
+
+ private void updateProcessedEntityReferences(Collection objIds, Map<String, String> guidAssignments) {
+ for (Object objId : objIds) {
+ updateProcessedEntityReferences(objId, guidAssignments);
+ }
+ }
+
static class FailedCommitOffsetRecorder {
private Long currentOffset;
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index ff9c9cb..9d6ad22 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -36,7 +36,7 @@ public class HivePreprocessor {
private static final Logger LOG = LoggerFactory.getLogger(HivePreprocessor.class);
private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS = "hive_table_columns";
- private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionKeys";
+ private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = "hive_table_partitionkeys";
static class HiveTablePreprocessor extends EntityPreprocessor {
public HiveTablePreprocessor() {
@@ -76,7 +76,7 @@ public class HivePreprocessor {
}
private void removeColumnsAttributeAndRegisterToMove(AtlasEntity entity, String attrName, String relationshipType, PreprocessorContext context) {
- Object attrVal = entity.getAttribute(attrName);
+ Object attrVal = entity.removeAttribute(attrName);
if (attrVal != null) {
Set<String> guids = new HashSet<>();
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 94e0993..c85c1b8 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -91,6 +91,10 @@ public class PreprocessorContext {
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; }
+ public boolean isHivePreprocessEnabled() {
+ return !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || hiveTypesRemoveOwnedRefAttrs;
+ }
+
public List<AtlasEntity> getEntities() {
return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() : null;
}