You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2020/04/13 00:09:55 UTC
[atlas] branch branch-2.0 updated: ATLAS-3689: added entity-audit
entries on business attributes add/update/delete to an entity
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 48b0566 ATLAS-3689: added entity-audit entries on business attributes add/update/delete to an entity
48b0566 is described below
commit 48b056689bc19ac01f253898c37a7f444c0c6852
Author: Mandar Ambawane <ma...@freestoneinfotech.com>
AuthorDate: Sun Apr 12 13:22:02 2020 +0530
ATLAS-3689: added entity-audit entries on business attributes add/update/delete to an entity
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
(cherry picked from commit 80135a8dbedfbaff444267681aa13d958e867c7e)
---
dashboardv2/public/js/utils/Enums.js | 5 +-
.../js/views/audit/CreateAuditTableLayoutView.js | 8 +-
.../atlas/listener/EntityChangeListenerV2.java | 9 +++
.../atlas/model/audit/EntityAuditEventV2.java | 5 +-
.../repository/audit/EntityAuditListenerV2.java | 82 ++++++++++++-------
.../store/graph/v2/AtlasEntityChangeNotifier.java | 26 +++++-
.../store/graph/v2/EntityGraphMapper.java | 84 ++++++++++++++-----
.../store/graph/v2/IAtlasEntityChangeNotifier.java | 57 +++++++++++++
.../v2/bulkimport/EntityChangeNotifierNop.java | 94 ++++++++++++++++++++++
.../notification/EntityNotificationListenerV2.java | 5 ++
10 files changed, 317 insertions(+), 58 deletions(-)
diff --git a/dashboardv2/public/js/utils/Enums.js b/dashboardv2/public/js/utils/Enums.js
index e2d8cd2..780dad2 100644
--- a/dashboardv2/public/js/utils/Enums.js
+++ b/dashboardv2/public/js/utils/Enums.js
@@ -39,8 +39,7 @@ define(['require'], function(require) {
LABEL_ADD: "Label(s) Added",
LABEL_DELETE: "Label(s) Deleted",
ENTITY_PURGE: "Entity Purged",
- BUSINESS_ATTRIBUTE_ADD: "Business Attribute(s) Added",
- BUSINESS_ATTRIBUTE_DELETE: "Business Attribute(s) Deleted"
+ BUSINESS_ATTRIBUTE_UPDATE: "Business Attribute(s) Updated"
}
Enums.entityStateReadOnly = {
@@ -214,4 +213,4 @@ define(['require'], function(require) {
1: "true"
};
return Enums;
-});
\ No newline at end of file
+});
diff --git a/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js b/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
index a820616..f7673ce 100644
--- a/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
+++ b/dashboardv2/public/js/views/audit/CreateAuditTableLayoutView.js
@@ -124,7 +124,11 @@ define(['require',
relationshipAttributes = parseDetailsObject.relationshipAttributes;
if (attributesDetails) {
that.ui.attributeDetails.removeClass('hide');
- that.action.indexOf("Classification") === -1 ? that.ui.panelAttrHeading.html("Technical properties ") : that.ui.panelAttrHeading.html("Properties ");
+ if (that.action.includes("Classification") || that.action.includes("Business Attribute") != -1) {
+ that.ui.panelAttrHeading.html("Properties ");
+ } else {
+ that.ui.panelAttrHeading.html("Technical properties ");
+ }
var attrTable = that.createTableWithValues(attributesDetails);
that.ui.attributeCard.html(
attrTable);
@@ -174,4 +178,4 @@ define(['require',
}
});
return CreateAuditTableLayoutView;
-});
\ No newline at end of file
+});
diff --git a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
index 2394a12..374d0dd 100644
--- a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
+++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -173,4 +174,12 @@ public interface EntityChangeListenerV2 {
* @throws AtlasBaseException if the listener notification fails
*/
void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws AtlasBaseException;
+
+ /**
+ *
+ * @param entity the entity
+ * @param updatedBusinessAttributes business metadata attribute
+ * @throws AtlasBaseException if the listener notification fails
+ */
+ void onBusinessAttributesUpdated(AtlasEntity entity, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException;
}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
index 9301e21..bcfdd94 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -50,7 +50,8 @@ public class EntityAuditEventV2 implements Serializable {
ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE,
CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
PROPAGATED_CLASSIFICATION_ADD, PROPAGATED_CLASSIFICATION_DELETE, PROPAGATED_CLASSIFICATION_UPDATE,
- TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE;
+ TERM_ADD, TERM_DELETE, LABEL_ADD, LABEL_DELETE, ENTITY_PURGE,
+ BUSINESS_ATTRIBUTE_UPDATE;
public static EntityAuditActionV2 fromString(String strValue) {
switch (strValue) {
@@ -91,6 +92,8 @@ public class EntityAuditEventV2 implements Serializable {
return LABEL_ADD;
case "LABEL_DELETE":
return LABEL_DELETE;
+ case "BUSINESS_ATTRIBUTE_UPDATE":
+ return BUSINESS_ATTRIBUTE_UPDATE;
}
throw new IllegalArgumentException("No enum constant " + EntityAuditActionV2.class.getCanonicalName() + "." + strValue);
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index cab4e1e..4c1e1a9 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -28,11 +28,13 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -50,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.BUSINESS_ATTRIBUTE_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_UPDATE;
@@ -341,6 +344,57 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
}
}
+ @Override
+ public void onRelationshipsAdded(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New relationship(s) added to repository(" + relationships.size() + ")");
+ }
+ }
+
+ @Override
+ public void onRelationshipsUpdated(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Relationship(s) updated(" + relationships.size() + ")");
+ }
+ }
+
+ @Override
+ public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
+ }
+ }
+
+ @Override
+ public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
+ }
+ }
+
+ @Override
+ public void onBusinessAttributesUpdated(AtlasEntity entity, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException {
+ if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+ MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
+
+ List<EntityAuditEventV2> auditEvents = new ArrayList<>();
+
+ for (Map.Entry<String, Map<String, Object>> entry : updatedBusinessAttributes.entrySet()) {
+ String bmName = entry.getKey();
+ Map<String, Object> attributes = entry.getValue();
+ String details = AtlasJson.toJson(new AtlasStruct(bmName, attributes));
+ EntityAuditEventV2 auditEvent = createEvent(entity, BUSINESS_ATTRIBUTE_UPDATE, "Updated business attributes: " + details);
+
+ auditEvents.add(auditEvent);
+ }
+
+ auditRepository.putEventsV2(auditEvents);
+
+ RequestContext.get().endMetricRecord(metric);
+ }
+ }
+
+
private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditActionV2 action, String details) {
return new EntityAuditEventV2(entity.getGuid(), RequestContext.get().getRequestTime(),
RequestContext.get().getUser(), action, details, entity);
@@ -566,32 +620,4 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
return ret;
}
-
- @Override
- public void onRelationshipsAdded(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("New relationship(s) added to repository(" + relationships.size() + ")");
- }
- }
-
- @Override
- public void onRelationshipsUpdated(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Relationship(s) updated(" + relationships.size() + ")");
- }
- }
-
- @Override
- public void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean isImport) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Relationship(s) deleted from repository(" + relationships.size() + ")");
- }
- }
-
- @Override
- public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Relationship(s) purged from repository(" + relationships.size() + ")");
- }
- }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index d7020a7..0dc3193 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -66,7 +66,7 @@ import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
@Component
-public class AtlasEntityChangeNotifier {
+public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
private final Set<EntityChangeListener> entityChangeListeners;
@@ -91,6 +91,7 @@ public class AtlasEntityChangeNotifier {
this.isV2EntityNotificationEnabled = AtlasRepositoryConfiguration.isV2EntityNotificationEnabled();
}
+ @Override
public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
@@ -119,6 +120,7 @@ public class AtlasEntityChangeNotifier {
notifyPropagatedEntities();
}
+ @Override
public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
if (CollectionUtils.isEmpty(entityChangeListeners)) {
return;
@@ -137,6 +139,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) {
doFullTextMapping(entity.getGuid());
@@ -166,6 +169,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
if (isV2EntityNotificationEnabled) {
doFullTextMappingHelper(entities);
@@ -201,6 +205,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid());
@@ -228,6 +233,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
doFullTextMapping(entity.getGuid());
@@ -255,6 +261,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
doFullTextMappingHelper(entities);
@@ -288,6 +295,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity association only if v2 notifications are enabled
if (isV2EntityNotificationEnabled) {
@@ -307,6 +315,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
// listeners notified on term-entity disassociation only if v2 notifications are enabled
if (isV2EntityNotificationEnabled) {
@@ -326,6 +335,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
doFullTextMapping(entityGuid);
@@ -339,6 +349,7 @@ public class AtlasEntityChangeNotifier {
}
}
+ @Override
public void notifyPropagatedEntities() throws AtlasBaseException {
RequestContext context = RequestContext.get();
Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations();
@@ -348,6 +359,18 @@ public class AtlasEntityChangeNotifier {
notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE);
}
+ @Override
+ public void onBusinessAttributesUpdated(String entityGuid, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException{
+ if (isV2EntityNotificationEnabled) {
+ AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid);
+
+ for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+ listener.onBusinessAttributesUpdated(entity, updatedBusinessAttributes);
+ }
+ }
+ }
+
+
private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action) throws AtlasBaseException {
if (MapUtils.isEmpty(entityPropagationMap) || action == null) {
return;
@@ -752,5 +775,4 @@ public class AtlasEntityChangeNotifier {
}
}
}
-
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 9269ae6..fc09015 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -399,24 +399,6 @@ public class EntityGraphMapper {
}
}
- private void updateLabels(AtlasVertex vertex, Set<String> labels) {
- if (CollectionUtils.isNotEmpty(labels)) {
- AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY, getLabelString(labels));
- } else {
- vertex.removeProperty(LABELS_PROPERTY_KEY);
- }
- }
-
- private String getLabelString(Collection<String> labels) {
- String ret = null;
-
- if (!labels.isEmpty()) {
- ret = LABEL_NAME_DELIMITER + String.join(LABEL_NAME_DELIMITER, labels) + LABEL_NAME_DELIMITER;
- }
-
- return ret;
- }
-
/*
* reset/overwrite business attributes of the entity with given values
*/
@@ -426,6 +408,7 @@ public class EntityGraphMapper {
}
Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes();
+ Map<String, Map<String, Object>> updatedBusinessAttributes = new HashMap<>();
for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) {
String bmName = entry.getKey();
@@ -444,6 +427,8 @@ public class EntityGraphMapper {
}
mapAttribute(bmAttribute, bmAttrNewValue, entityVertex, CREATE, new EntityMutationContext());
+
+ addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrNewValue);
}
} else {
if (bmAttrNewValue != null) {
@@ -453,6 +438,8 @@ public class EntityGraphMapper {
}
mapAttribute(bmAttribute, bmAttrNewValue, entityVertex, UPDATE, new EntityMutationContext());
+
+ addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrNewValue);
}
} else {
if (LOG.isDebugEnabled()) {
@@ -460,11 +447,17 @@ public class EntityGraphMapper {
}
entityVertex.removeProperty(bmAttribute.getVertexPropertyName());
+
+ addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrNewValue);
}
}
}
}
+ if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+ entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== setBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
}
@@ -479,6 +472,7 @@ public class EntityGraphMapper {
}
Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes();
+ Map<String, Map<String, Object>> updatedBusinessAttributes = new HashMap<>();
if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) && MapUtils.isNotEmpty(businessAttributes)) {
for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) {
@@ -503,16 +497,24 @@ public class EntityGraphMapper {
if (existingValue == null) {
if (bmAttrValue != null) {
mapAttribute(bmAttribute, bmAttrValue, entityVertex, CREATE, new EntityMutationContext());
+
+ addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrValue);
}
} else {
if (!Objects.equals(existingValue, bmAttrValue)) {
mapAttribute(bmAttribute, bmAttrValue, entityVertex, UPDATE, new EntityMutationContext());
+
+ addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, bmAttrValue);
}
}
}
}
}
+ if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+ entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== addOrUpdateBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
}
@@ -521,12 +523,13 @@ public class EntityGraphMapper {
/*
* remove the given business attributes from the entity
*/
- public void removeBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType entityType, Map<String, Map<String, Object>> businessAttributes) {
+ public void removeBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType entityType, Map<String, Map<String, Object>> businessAttributes) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> removeBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
}
Map<String, Map<String, AtlasBusinessAttribute>> entityTypeBusinessAttributes = entityType.getBusinessAttributes();
+ Map<String, Map<String, Object>> updatedBusinessAttributes = new HashMap<>();
if (MapUtils.isNotEmpty(entityTypeBusinessAttributes) && MapUtils.isNotEmpty(businessAttributes)) {
for (Map.Entry<String, Map<String, AtlasBusinessAttribute>> entry : entityTypeBusinessAttributes.entrySet()) {
@@ -539,16 +542,22 @@ public class EntityGraphMapper {
Map<String, Object> entityBmAttributes = businessAttributes.get(bmName);
- for (AtlasBusinessAttribute bmttribute : bmAttributes.values()) {
+ for (AtlasBusinessAttribute bmAttribute : bmAttributes.values()) {
// if (entityBmAttributes is empty) remove all attributes in this business-metadata
// else remove the attribute only if its given in entityBmAttributes
- if (MapUtils.isEmpty(entityBmAttributes) || entityBmAttributes.containsKey(bmttribute.getName())) {
- entityVertex.removeProperty(bmttribute.getVertexPropertyName());
+ if (MapUtils.isEmpty(entityBmAttributes) || entityBmAttributes.containsKey(bmAttribute.getName())) {
+ entityVertex.removeProperty(bmAttribute.getVertexPropertyName());
+
+ addToUpdatedBusinessAttributes(updatedBusinessAttributes, bmAttribute, null);
}
}
}
}
+ if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
+ entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== removeBusinessAttributes(entityVertex={}, entityType={}, businessAttributes={}", entityVertex, entityType.getTypeName(), businessAttributes);
}
@@ -2559,4 +2568,35 @@ public class EntityGraphMapper {
return propagatedEntities;
}
+
+ private void updateLabels(AtlasVertex vertex, Set<String> labels) {
+ if (CollectionUtils.isNotEmpty(labels)) {
+ AtlasGraphUtilsV2.setEncodedProperty(vertex, LABELS_PROPERTY_KEY, getLabelString(labels));
+ } else {
+ vertex.removeProperty(LABELS_PROPERTY_KEY);
+ }
+ }
+
+ private String getLabelString(Collection<String> labels) {
+ String ret = null;
+
+ if (!labels.isEmpty()) {
+ ret = LABEL_NAME_DELIMITER + String.join(LABEL_NAME_DELIMITER, labels) + LABEL_NAME_DELIMITER;
+ }
+
+ return ret;
+ }
+
+ private void addToUpdatedBusinessAttributes(Map<String, Map<String, Object>> updatedBusinessAttributes, AtlasBusinessAttribute bmAttribute, Object attrValue) {
+ String bmName = bmAttribute.getDefinedInType().getTypeName();
+ Map<String, Object> attributes = updatedBusinessAttributes.get(bmName);
+
+ if(attributes == null){
+ attributes = new HashMap<>();
+
+ updatedBusinessAttributes.put(bmName, attributes);
+ }
+
+ attributes.put(bmAttribute.getName(), attrValue);
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
new file mode 100644
index 0000000..f617ef9
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IAtlasEntityChangeNotifier.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface IAtlasEntityChangeNotifier {
+ void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException;
+
+ void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException;
+
+ void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+ void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException;
+
+ void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+ void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException;
+
+ void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+ void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException;
+
+ void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException;
+
+ void notifyPropagatedEntities() throws AtlasBaseException;
+
+ void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException;
+
+ void onBusinessAttributesUpdated(String entityGuid, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException;
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
new file mode 100644
index 0000000..05516f2
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/EntityChangeNotifierNop.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EntityChangeNotifierNop implements IAtlasEntityChangeNotifier {
+ @Override
+ public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void notifyRelationshipMutation(AtlasRelationship relationship, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationsAddedToEntities(List<AtlasEntity> entities, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationsDeletedFromEntities(List<AtlasEntity> entities, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onLabelsUpdatedFromEntity(String entityGuid, Set<String> addedLabels, Set<String> deletedLabels) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void notifyPropagatedEntities() throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
+
+ }
+
+ @Override
+ public void onBusinessAttributesUpdated(String entityGuid, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException {
+
+ }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index 6d64fec..a677b31 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -316,4 +316,9 @@ public class EntityNotificationListenerV2 implements EntityChangeListenerV2 {
public void onRelationshipsPurged(List<AtlasRelationship> relationships) throws AtlasBaseException {
// do nothing -> notification not sent out for term purged from entities as its been sent in case of delete
}
+
+ @Override
+ public void onBusinessAttributesUpdated(AtlasEntity entity, Map<String, Map<String, Object>> updatedBusinessAttributes) throws AtlasBaseException{
+ // do nothing -> notification not sent out for business metadata attribute updation from entities
+ }
}
\ No newline at end of file