You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2021/01/22 07:48:32 UTC
[atlas] branch master updated: ATLAS-4019: audit improvement to
store only delta
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 8d7ac77 ATLAS-4019: audit improvement to store only delta
8d7ac77 is described below
commit 8d7ac77a74479263ce59b9e22bb3def91b8b9326
Author: Deep Singh <de...@gmail.com>
AuthorDate: Thu Jan 21 15:41:35 2021 -0800
ATLAS-4019: audit improvement to store only delta
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
---
.../java/org/apache/atlas/AtlasConfiguration.java | 5 +
.../test/java/org/apache/atlas/TestUtilsV2.java | 1 +
.../repository/audit/EntityAuditListenerV2.java | 32 +++-
.../store/graph/v2/AtlasEntityComparator.java | 213 +++++++++++++++++++++
.../store/graph/v2/AtlasEntityStoreV2.java | 134 +++----------
.../store/graph/v2/AtlasEntityComparatorTest.java | 204 ++++++++++++++++++++
.../store/graph/v2/AtlasEntityStoreV2Test.java | 49 ++++-
.../main/java/org/apache/atlas/RequestContext.java | 13 ++
8 files changed, 540 insertions(+), 111 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 396aad0..08d6c9d 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -75,6 +75,7 @@ public enum AtlasConfiguration {
HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"),
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true),
REBUILD_INDEX("atlas.rebuild.index", false),
+ STORE_DIFFERENTIAL_AUDITS("atlas.entity.audit.differential", false),
DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true);
private static final Configuration APPLICATION_PROPERTIES;
@@ -95,6 +96,10 @@ public enum AtlasConfiguration {
this.defaultValue = defaultValue;
}
+ public String getPropertyName() {
+ return propertyName;
+ }
+
public int getInt() {
return APPLICATION_PROPERTIES.getInt(propertyName, Integer.valueOf(defaultValue.toString()).intValue());
}
diff --git a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
index 2b9cf6e..80ab6b4 100755
--- a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
+++ b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java
@@ -566,6 +566,7 @@ public final class TestUtilsV2 {
public static final String SERDE_TYPE = "serdeType";
public static final String COLUMNS_MAP = "columnsMap";
public static final String COLUMNS_ATTR_NAME = "columns";
+ public static final String TABLE_ATTR_NAME = "table";
public static final String ENTITY_TYPE_WITH_SIMPLE_ATTR = "entity_with_simple_attr";
public static final String ENTITY_TYPE_WITH_NESTED_COLLECTION_ATTR = "entity_with_nested_collection_attr";
public static final String ENTITY_TYPE_WITH_COMPLEX_COLLECTION_ATTR = "entity_with_complex_collection_attr";
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 0043f1c..0b7d000 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -47,11 +47,13 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.nio.charset.StandardCharsets;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.atlas.AtlasConfiguration.STORE_DIFFERENTIAL_AUDITS;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.BUSINESS_ATTRIBUTE_UPDATE;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD;
import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_DELETE;
@@ -106,14 +108,30 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
@Override
public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException {
- MetricRecorder metric = RequestContext.get().startMetricRecord("entityAudit");
-
+ RequestContext reqContext = RequestContext.get();
+ MetricRecorder metric = reqContext.startMetricRecord("entityAudit");
FixedBufferList<EntityAuditEventV2> updatedEvents = getAuditEventsList();
- for (AtlasEntity entity : entities) {
- EntityAuditActionV2 action = isImport ? ENTITY_IMPORT_UPDATE :
- RequestContext.get().checkIfEntityIsForCustomAttributeUpdate(entity.getGuid()) ? CUSTOM_ATTRIBUTE_UPDATE :
- RequestContext.get().checkIfEntityIsForBusinessAttributeUpdate(entity.getGuid()) ? BUSINESS_ATTRIBUTE_UPDATE :
- ENTITY_UPDATE;
+ Collection<AtlasEntity> updatedEntites;
+
+ if (STORE_DIFFERENTIAL_AUDITS.getBoolean()) {
+ updatedEntites = reqContext.getDifferentialEntities();
+ } else {
+ updatedEntites = entities;
+ }
+
+ for (AtlasEntity entity : updatedEntites) {
+ final EntityAuditActionV2 action;
+
+ if (isImport) {
+ action = ENTITY_IMPORT_UPDATE;
+ } else if (reqContext.checkIfEntityIsForCustomAttributeUpdate(entity.getGuid())) {
+ action = CUSTOM_ATTRIBUTE_UPDATE;
+ } else if (reqContext.checkIfEntityIsForBusinessAttributeUpdate(entity.getGuid())) {
+ action = BUSINESS_ATTRIBUTE_UPDATE;
+ } else {
+ action = ENTITY_UPDATE;
+ }
+
createEvent(updatedEvents.next(), entity, action);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparator.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparator.java
new file mode 100644
index 0000000..d962385
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparator.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasEntityUtil;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
+
+public class AtlasEntityComparator {
+ private final AtlasTypeRegistry typeRegistry;
+ private final EntityGraphRetriever entityRetriever;
+ private final Map<String, String> guidRefMap;
+ private final boolean skipClassificationCompare;
+ private final boolean skipBusinessAttributeCompare;
+
+ public AtlasEntityComparator(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, Map<String, String> guidRefMap,
+ boolean skipClassificationCompare, boolean skipBusinessAttributeCompare) {
+ this.typeRegistry = typeRegistry;
+ this.entityRetriever = entityRetriever;
+ this.guidRefMap = guidRefMap;
+ this.skipClassificationCompare = skipClassificationCompare;
+ this.skipBusinessAttributeCompare = skipBusinessAttributeCompare;
+ }
+
+ public AtlasEntityDiffResult getDiffResult(AtlasEntity updatedEntity, AtlasEntity storedEntity, boolean findOnlyFirstDiff) throws AtlasBaseException {
+ return getDiffResult(updatedEntity, storedEntity, null, findOnlyFirstDiff);
+ }
+
+ public AtlasEntityDiffResult getDiffResult(AtlasEntity updatedEntity, AtlasVertex storedVertex, boolean findOnlyFirstDiff) throws AtlasBaseException {
+ return getDiffResult(updatedEntity, null, storedVertex, findOnlyFirstDiff);
+ }
+
+ private AtlasEntityDiffResult getDiffResult(AtlasEntity updatedEntity, AtlasEntity storedEntity, AtlasVertex storedVertex, boolean findOnlyFirstDiff) throws AtlasBaseException {
+ AtlasEntity diffEntity = new AtlasEntity(updatedEntity.getTypeName());
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(updatedEntity.getTypeName());
+ Map<String, AtlasAttribute> entityTypeAttributes = entityType.getAllAttributes();
+ Map<String, Map<String, AtlasAttribute>> entityTypeRelationshipAttributes = entityType.getRelationshipAttributes();
+
+ int sectionsWithDiff = 0;
+ boolean hasDiffInAttributes = false;
+ boolean hasDiffInRelationshipAttributes = false;
+ boolean hasDiffInCustomAttributes = false;
+ boolean hasDiffInBusinessAttributes = false;
+
+ diffEntity.setGuid(updatedEntity.getGuid());
+
+ if (MapUtils.isNotEmpty(updatedEntity.getAttributes())) { // check for attribute value change
+ for (Map.Entry<String, Object> entry : updatedEntity.getAttributes().entrySet()) {
+ String attrName = entry.getKey();
+ AtlasAttribute attribute = entityTypeAttributes.get(attrName);
+
+ if (attribute == null) { // no such attribute
+ continue;
+ }
+
+ Object newVal = entry.getValue();
+ Object currVal = (storedEntity != null) ? storedEntity.getAttribute(attrName) : entityRetriever.getEntityAttribute(storedVertex, attribute);
+
+ if (!attribute.getAttributeType().areEqualValues(currVal, newVal, guidRefMap)) {
+ hasDiffInAttributes = true;
+
+ diffEntity.setAttribute(attrName, newVal);
+
+ if (findOnlyFirstDiff) {
+ return new AtlasEntityDiffResult(diffEntity, true, false, false);
+ }
+ }
+ }
+
+ if (hasDiffInAttributes) {
+ sectionsWithDiff++;
+ }
+ }
+
+ if (MapUtils.isNotEmpty(updatedEntity.getRelationshipAttributes())) { // check for relationship-attribute value change
+ for (Map.Entry<String, Object> entry : updatedEntity.getRelationshipAttributes().entrySet()) {
+ String attrName = entry.getKey();
+
+ if (!entityTypeRelationshipAttributes.containsKey(attrName)) { // no such attribute
+ continue;
+ }
+
+ Object newVal = entry.getValue();
+ String relationshipType = AtlasEntityUtil.getRelationshipType(newVal);
+ AtlasAttribute attribute = entityType.getRelationshipAttribute(attrName, relationshipType);
+ Object currVal = (storedEntity != null) ? storedEntity.getRelationshipAttribute(attrName) : entityRetriever.getEntityAttribute(storedVertex, attribute);
+
+ if (!attribute.getAttributeType().areEqualValues(currVal, newVal, guidRefMap)) {
+ hasDiffInRelationshipAttributes = true;
+
+ diffEntity.setRelationshipAttribute(attrName, newVal);
+
+ if (findOnlyFirstDiff) {
+ return new AtlasEntityDiffResult(diffEntity, true, false, false);
+ }
+ }
+ }
+
+ if (hasDiffInRelationshipAttributes) {
+ sectionsWithDiff++;
+ }
+ }
+
+ if (!skipClassificationCompare) {
+ List<AtlasClassification> newVal = updatedEntity.getClassifications();
+ List<AtlasClassification> currVal = (storedEntity != null) ? storedEntity.getClassifications() : entityRetriever.getAllClassifications(storedVertex);
+
+ if (!Objects.equals(currVal, newVal)) {
+ diffEntity.setClassifications(newVal);
+
+ sectionsWithDiff++;
+
+ if (findOnlyFirstDiff) {
+ return new AtlasEntityDiffResult(diffEntity, true, false, false);
+ }
+ }
+ }
+
+ if (updatedEntity.getCustomAttributes() != null) {
+ // event coming from hook does not have custom attributes, such events must not remove existing attributes
+ // UI sends empty object in case of of intended removal.
+ Map<String, String> newCustomAttributes = updatedEntity.getCustomAttributes();
+ Map<String, String> currCustomAttributes = (storedEntity != null) ? storedEntity.getCustomAttributes() : getCustomAttributes(storedVertex);
+
+ if (!Objects.equals(currCustomAttributes, newCustomAttributes)) {
+ diffEntity.setCustomAttributes(newCustomAttributes);
+
+ hasDiffInCustomAttributes = true;
+ sectionsWithDiff++;
+
+ if (findOnlyFirstDiff && sectionsWithDiff > 1) {
+ return new AtlasEntityDiffResult(diffEntity, true, false, false);
+ }
+ }
+ }
+
+ if (!skipBusinessAttributeCompare) {
+ Map<String, Map<String, Object>> newBusinessMetadata = updatedEntity.getBusinessAttributes();
+ Map<String, Map<String, Object>> currBusinessMetadata = (storedEntity != null) ? storedEntity.getBusinessAttributes() : entityRetriever.getBusinessMetadata(storedVertex);;
+
+ if (!Objects.equals(currBusinessMetadata, newBusinessMetadata)) {
+ diffEntity.setBusinessAttributes(newBusinessMetadata);
+
+ hasDiffInBusinessAttributes = true;
+ sectionsWithDiff++;
+
+ if (findOnlyFirstDiff && sectionsWithDiff > 1) {
+ return new AtlasEntityDiffResult(diffEntity, true, false, false);
+ }
+ }
+ }
+
+ return new AtlasEntityDiffResult(diffEntity, sectionsWithDiff > 0, sectionsWithDiff == 1 && hasDiffInCustomAttributes, sectionsWithDiff == 1 && hasDiffInBusinessAttributes);
+ }
+
+ public static class AtlasEntityDiffResult {
+ private final AtlasEntity diffEntity;
+ private final boolean hasDifference;
+ private final boolean hasDifferenceOnlyInCustomAttributes;
+ private final boolean hasDifferenceOnlyInBusinessAttributes;
+
+ AtlasEntityDiffResult(AtlasEntity diffEntity, boolean hasDifference, boolean hasDifferenceOnlyInCustomAttributes, boolean hasDifferenceOnlyInBusinessAttributes) {
+ this.diffEntity = diffEntity;
+ this.hasDifference = hasDifference;
+ this.hasDifferenceOnlyInCustomAttributes = hasDifferenceOnlyInCustomAttributes;
+ this.hasDifferenceOnlyInBusinessAttributes = hasDifferenceOnlyInBusinessAttributes;
+ }
+
+ public AtlasEntity getDiffEntity() {
+ return diffEntity;
+ }
+
+ public boolean hasDifference() {
+ return hasDifference;
+ }
+
+ public boolean hasDifferenceOnlyInCustomAttributes() {
+ return hasDifferenceOnlyInCustomAttributes;
+ }
+
+ public boolean hasDifferenceOnlyInBusinessAttributes() {
+ return hasDifferenceOnlyInBusinessAttributes;
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 2440722..ce58e9a 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.DeleteType;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
@@ -48,7 +49,7 @@ import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
-import org.apache.atlas.DeleteType;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityComparator.AtlasEntityDiffResult;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
import org.apache.atlas.type.AtlasClassificationType;
@@ -85,11 +86,11 @@ import java.util.Objects;
import java.util.Set;
import static java.lang.Boolean.FALSE;
+import static org.apache.atlas.AtlasConfiguration.STORE_DIFFERENTIAL_AUDITS;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PURGE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
-import static org.apache.atlas.repository.graph.GraphHelper.getCustomAttributes;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
import static org.apache.atlas.repository.store.graph.v2.EntityGraphMapper.validateLabels;
@@ -100,12 +101,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("store.EntityStore");
- private final AtlasGraph graph;
- private final DeleteHandlerDelegate deleteDelegate;
- private final AtlasTypeRegistry typeRegistry;
+ private final AtlasGraph graph;
+ private final DeleteHandlerDelegate deleteDelegate;
+ private final AtlasTypeRegistry typeRegistry;
private final IAtlasEntityChangeNotifier entityChangeNotifier;
- private final EntityGraphMapper entityGraphMapper;
- private final EntityGraphRetriever entityRetriever;
+ private final EntityGraphMapper entityGraphMapper;
+ private final EntityGraphRetriever entityRetriever;
+ private final boolean storeDifferentialAudits;
@Inject
public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry,
@@ -116,6 +118,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
this.entityChangeNotifier = entityChangeNotifier;
this.entityGraphMapper = entityGraphMapper;
this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
+ this.storeDifferentialAudits = STORE_DIFFERENTIAL_AUDITS.getBoolean();
}
@Override
@@ -1111,7 +1114,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
}
- private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications, boolean replaceBusinessAttributes) throws AtlasBaseException {
+ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications, boolean replaceBusinessAttributes) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createOrUpdate()");
}
@@ -1143,130 +1146,55 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) {
MetricRecorder checkForUnchangedEntities = RequestContext.get().startMetricRecord("checkForUnchangedEntities");
- List<AtlasEntity> entitiesToSkipUpdate = null;
+ List<AtlasEntity> entitiesToSkipUpdate = new ArrayList<>();
+ AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), !replaceClassifications, !replaceBusinessAttributes);
+ RequestContext reqContext = RequestContext.get();
for (AtlasEntity entity : context.getUpdatedEntities()) {
- String guid = entity.getGuid();
- AtlasVertex vertex = context.getVertex(guid);
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
- boolean hasUpdates = false, hasUpdatesInCustAttr = false, hasUpdatesInBusAttr = false;
-
- if (!hasUpdates) {
- hasUpdates = entity.getStatus() == AtlasEntity.Status.DELETED; // entity status could be updated during import
- }
-
- if (!hasUpdates && MapUtils.isNotEmpty(entity.getAttributes())) { // check for attribute value change
- for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
- if (!entity.getAttributes().containsKey(attribute.getName())) { // if value is not provided, current value will not be updated
- continue;
- }
-
- Object newVal = entity.getAttribute(attribute.getName());
- Object currVal = entityRetriever.getEntityAttribute(vertex, attribute);
-
- if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) {
- hasUpdates = true;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("found attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal);
- }
-
- break;
- }
- }
- }
-
- if (!hasUpdates && MapUtils.isNotEmpty(entity.getRelationshipAttributes())) { // check of relationsship-attribute value change
- for (String attributeName : entityType.getRelationshipAttributes().keySet()) {
- if (!entity.getRelationshipAttributes().containsKey(attributeName)) { // if value is not provided, current value will not be updated
- continue;
- }
-
- Object newVal = entity.getRelationshipAttribute(attributeName);
- String relationshipType = AtlasEntityUtil.getRelationshipType(newVal);
- AtlasAttribute attribute = entityType.getRelationshipAttribute(attributeName, relationshipType);
- Object currVal = entityRetriever.getEntityAttribute(vertex, attribute);
-
- if (!attribute.getAttributeType().areEqualValues(currVal, newVal, context.getGuidAssignments())) {
- hasUpdates = true;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("found relationship attribute update: entity(guid={}, typeName={}), attrName={}, currValue={}, newValue={}", guid, entity.getTypeName(), attribute.getName(), currVal, newVal);
- }
-
- break;
- }
- }
- }
-
- if (entity.getCustomAttributes() != null) {
- Map<String, String> currCustomAttributes = getCustomAttributes(vertex);
- Map<String, String> newCustomAttributes = entity.getCustomAttributes();
-
- if (!Objects.equals(currCustomAttributes, newCustomAttributes)) {
- hasUpdatesInCustAttr = true;
- }
- }
-
- if (replaceBusinessAttributes) {
- Map<String, Map<String, Object>> currBusinessMetadata = entityRetriever.getBusinessMetadata(vertex);
- Map<String, Map<String, Object>> newBusinessMetadata = entity.getBusinessAttributes();
-
- if (!Objects.equals(currBusinessMetadata, newBusinessMetadata)) {
- hasUpdatesInBusAttr = true;
- }
+ if (entity.getStatus() == AtlasEntity.Status.DELETED) {// entity status could be updated during import
+ continue;
}
- // if classifications are to be replaced, then skip updates only when no change in classifications
- if (!hasUpdates && replaceClassifications) {
- List<AtlasClassification> newVal = entity.getClassifications();
- List<AtlasClassification> currVal = entityRetriever.getAllClassifications(vertex);
-
- if (!Objects.equals(currVal, newVal)) {
- hasUpdates = true;
+ AtlasVertex storedVertex = context.getVertex(entity.getGuid());
+ AtlasEntityDiffResult diffResult = entityComparator.getDiffResult(entity, storedVertex, !storeDifferentialAudits);
- if (LOG.isDebugEnabled()) {
- LOG.debug("found classifications update: entity(guid={}, typeName={}), currValue={}, newValue={}", guid, entity.getTypeName(), currVal, newVal);
- }
+ if (diffResult.hasDifference()) {
+ if (storeDifferentialAudits) {
+ diffResult.getDiffEntity().setGuid(entity.getGuid());
+ reqContext.cacheDifferentialEntity(diffResult.getDiffEntity());
}
- }
- if(!hasUpdates && (hasUpdatesInCustAttr || hasUpdatesInBusAttr)) {
- hasUpdates = true;
- if (hasUpdatesInCustAttr ^ hasUpdatesInBusAttr) {
- if(hasUpdatesInCustAttr) RequestContext.get().recordEntityWithCustomAttributeUpdate(entity.getGuid());
- if(hasUpdatesInBusAttr) RequestContext.get().recordEntityWithBusinessAttributeUpdate(entity.getGuid());
+ if (diffResult.hasDifferenceOnlyInCustomAttributes()) {
+ reqContext.recordEntityWithCustomAttributeUpdate(entity.getGuid());
}
- }
- if (!hasUpdates) {
- if (entitiesToSkipUpdate == null) {
- entitiesToSkipUpdate = new ArrayList<>();
+ if (diffResult.hasDifferenceOnlyInBusinessAttributes()) {
+ reqContext.recordEntityWithBusinessAttributeUpdate(entity.getGuid());
}
-
+ } else {
if (LOG.isDebugEnabled()) {
LOG.debug("skipping unchanged entity: {}", entity);
}
entitiesToSkipUpdate.add(entity);
- RequestContext.get().recordEntityToSkip(entity.getGuid());
+ reqContext.recordEntityToSkip(entity.getGuid());
}
}
- if (entitiesToSkipUpdate != null) {
+ if (entitiesToSkipUpdate.size() > 0) {
// remove entitiesToSkipUpdate from EntityMutationContext
context.getUpdatedEntities().removeAll(entitiesToSkipUpdate);
}
// Check if authorized to update entities
- if (!RequestContext.get().isImportInProgress()) {
+ if (!reqContext.isImportInProgress()) {
for (AtlasEntity entity : context.getUpdatedEntities()) {
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, new AtlasEntityHeader(entity)),
"update entity: type=", entity.getTypeName());
}
}
- RequestContext.get().endMetricRecord(checkForUnchangedEntities);
+ reqContext.endMetricRecord(checkForUnchangedEntities);
}
EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications, replaceBusinessAttributes);
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparatorTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparatorTest.java
new file mode 100644
index 0000000..7e4a7e2
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparatorTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestRelationshipUtilsV2;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.*;
+
+import static org.apache.atlas.TestUtilsV2.NAME;
+import static org.apache.atlas.type.AtlasTypeUtil.getAtlasObjectId;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class AtlasEntityComparatorTest extends AtlasEntityTestBase {
+
+ private AtlasEntityComparator comparator;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ super.setUp();
+
+ AtlasTypesDef[] testTypesDefs = new AtlasTypesDef[] {
+ TestUtilsV2.defineHiveTypes(),
+ TestRelationshipUtilsV2.getInverseReferenceTestTypes()
+ };
+ createTypesDef(testTypesDefs);
+
+ comparator = new AtlasEntityComparator(typeRegistry, null, null, false, false);
+ }
+
+ @AfterClass
+ public void clear() throws Exception {
+
+ }
+
+ @Test
+ public void testSameEntities () throws AtlasBaseException {
+ AtlasEntity dbEntity = TestUtilsV2.createDBEntity();
+ dbEntity.setAttribute("namespace", "db namespace");
+ dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+ AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);
+ AtlasEntity newTableEntity = TestUtilsV2.createTableEntity(dbEntity, tableEntity.getAttribute("name").toString());
+
+ tableEntity.setClassifications(toAtlasClassifications(new String[]{"test-1"}));
+ newTableEntity.setClassifications(toAtlasClassifications(new String[]{"test-1"}));
+
+ tableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar");}});
+ newTableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar");}});
+
+ AtlasEntityComparator.AtlasEntityDiffResult diffResult = comparator.getDiffResult(tableEntity, newTableEntity, true);
+ Assert.assertFalse(diffResult.hasDifference());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+
+ diffResult = comparator.getDiffResult(tableEntity, newTableEntity, false);
+ Assert.assertFalse(diffResult.hasDifference());
+ }
+
+ @Test
+ public void testChangedEntities () throws AtlasBaseException {
+ AtlasEntity dbEntity = TestUtilsV2.createDBEntity();
+ dbEntity.setAttribute("namespace", "db namespace");
+ dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+ AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);
+ AtlasEntity newTableEntity = TestUtilsV2.createTableEntity(dbEntity);
+
+ tableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar-1");}});
+ newTableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar-2");}});
+
+ AtlasEntityComparator.AtlasEntityDiffResult diffResult = comparator.getDiffResult(newTableEntity, tableEntity, true);
+ Assert.assertTrue(diffResult.hasDifference());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+
+ diffResult = comparator.getDiffResult(tableEntity, newTableEntity, false);
+ Assert.assertNotNull(diffResult.getDiffEntity());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+ }
+
+ @Test
+ public void testChangedEntitiesCustomAttribute () throws AtlasBaseException {
+ AtlasEntity dbEntity = TestUtilsV2.createDBEntity();
+ dbEntity.setAttribute("namespace", "db namespace");
+ dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+ AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);
+ AtlasEntity newTableEntity = TestUtilsV2.createTableEntity(dbEntity, tableEntity.getAttribute("name").toString());
+
+ tableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar-1");}});
+ newTableEntity.setCustomAttributes(new HashMap<String, String> (){{ put("foo","bar-2");}});
+
+ AtlasEntityComparator.AtlasEntityDiffResult diffResult = comparator.getDiffResult(newTableEntity, tableEntity, true);
+ Assert.assertTrue(diffResult.hasDifference());
+ Assert.assertTrue(diffResult.hasDifferenceOnlyInCustomAttributes());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+
+ diffResult = comparator.getDiffResult(tableEntity, newTableEntity, false);
+ Assert.assertNotNull(diffResult.getDiffEntity());
+ Assert.assertTrue(diffResult.hasDifferenceOnlyInCustomAttributes());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+ }
+
+ @Test
+ public void testChangedEntitiesClassification () throws AtlasBaseException {
+ AtlasEntity dbEntity = TestUtilsV2.createDBEntity();
+ dbEntity.setAttribute("namespace", "db namespace");
+ dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+ AtlasEntity tableEntity = TestUtilsV2.createTableEntity(dbEntity);
+ AtlasEntity newTableEntity = TestUtilsV2.createTableEntity(dbEntity, tableEntity.getAttribute("name").toString());
+
+ tableEntity.setClassifications(toAtlasClassifications(new String[]{"test-1"}));
+ newTableEntity.setClassifications(toAtlasClassifications(new String[]{"test-2"}));
+
+ AtlasEntityComparator.AtlasEntityDiffResult diffResult = comparator.getDiffResult(newTableEntity, tableEntity, true);
+ Assert.assertTrue(diffResult.hasDifference());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+
+ diffResult = comparator.getDiffResult(tableEntity, newTableEntity, false);
+ Assert.assertNotNull(diffResult.getDiffEntity());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInCustomAttributes());
+ Assert.assertFalse(diffResult.hasDifferenceOnlyInBusinessAttributes());
+ }
+
+ @Test
+ public void testChangedRelationshipAttributes () throws AtlasBaseException {
+ AtlasEntity a1 = new AtlasEntity("A");
+ a1.setAttribute(NAME, "a1_name");
+
+ AtlasEntity a2 = new AtlasEntity("A");
+ a2.setAttribute(NAME, "a2_name");
+
+ AtlasEntity a3 = new AtlasEntity("A");
+ a3.setAttribute(NAME, "a3_name");
+
+ AtlasEntity b1 = new AtlasEntity("B");
+ b1.setAttribute(NAME, "b_name");
+ b1.setRelationshipAttribute("manyA", ImmutableList.of(getAtlasObjectId(a1), getAtlasObjectId(a2)));
+
+ AtlasEntity b2 = new AtlasEntity("B");
+ b2.setAttribute(NAME, "b_name");
+ b2.setRelationshipAttribute("manyA", ImmutableList.of(getAtlasObjectId(a1), getAtlasObjectId(a2)));
+
+ AtlasEntity b3 = new AtlasEntity("B");
+ b3.setAttribute(NAME, "b_name");
+ b3.setRelationshipAttribute("manyA", ImmutableList.of(getAtlasObjectId(a3)));
+
+ AtlasEntity b4 = new AtlasEntity("B");
+ b4.setAttribute(NAME, "b_name");
+
+ Assert.assertFalse(comparator.getDiffResult(b1, b2, true).hasDifference());
+ Assert.assertTrue(comparator.getDiffResult(b1, b3, true).hasDifference());
+ Assert.assertTrue(comparator.getDiffResult(b1, b4, true).hasDifference());
+ Assert.assertTrue(comparator.getDiffResult(b3, b4, true).hasDifference());
+
+ Assert.assertFalse(comparator.getDiffResult(b1, b2, false).hasDifference());
+ Assert.assertTrue(comparator.getDiffResult(b1, b3, false).hasDifference());
+ Assert.assertTrue(comparator.getDiffResult(b1, b4, false).hasDifference());
+ Assert.assertTrue(comparator.getDiffResult(b3, b4, false).hasDifference());
+ }
+
+ private List<AtlasClassification> toAtlasClassifications(String[] classificationNames) {
+ List<AtlasClassification> ret = new ArrayList<>();
+
+ if (classificationNames != null) {
+ for (String classificationName : classificationNames) {
+ ret.add(new AtlasClassification(classificationName));
+ }
+ }
+
+ return ret;
+ }
+
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java
index c9f4912..9aced8e 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2Test.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2;
import com.google.common.collect.ImmutableSet;
+import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
@@ -61,6 +62,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.atlas.AtlasConfiguration.STORE_DIFFERENTIAL_AUDITS;
import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_KEY_CHARACTERS;
import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_KEY_LENGTH;
import static org.apache.atlas.AtlasErrorCode.INVALID_CUSTOM_ATTRIBUTE_VALUE;
@@ -854,6 +856,45 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
}
@Test
+ public void testDifferentialEntitiesOnUpdate () throws Exception {
+
+ ApplicationProperties.get().setProperty(STORE_DIFFERENTIAL_AUDITS.getPropertyName(), true);
+ init();
+ // test for entity attributes
+
+ AtlasEntity dbEntity = new AtlasEntity(TestUtilsV2.DATABASE_TYPE);
+ dbEntity.setAttribute("name", TestUtilsV2.randomString(10));
+ dbEntity.setAttribute("description", "us db");
+ dbEntity.setAttribute("namespace", "db namespace");
+ dbEntity.setAttribute("cluster", "Fenton_Cluster");
+
+ EntityStream dbStream = new AtlasEntityStream(new AtlasEntitiesWithExtInfo(dbEntity));
+ EntityMutationResponse response = entityStore.createOrUpdate(dbStream, false);
+ AtlasEntityHeader dbHeader = response.getFirstEntityCreated();
+ AtlasEntity createdDbEntity = getEntityFromStore(dbHeader);
+
+ assertEquals(RequestContext.get().getDifferentialEntities().size(), 0);
+
+ // update the db entity
+ dbEntity = new AtlasEntity(TestUtilsV2.DATABASE_TYPE);
+ dbEntity.setGuid(createdDbEntity.getGuid());
+ dbEntity.setAttribute("description", "new description");
+
+ dbStream = new AtlasEntityStream(new AtlasEntitiesWithExtInfo(dbEntity));
+
+ response = entityStore.createOrUpdate(dbStream, true);
+ dbHeader = response.getFirstEntityPartialUpdated();
+ AtlasEntity updatedDbEntity = getEntityFromStore(dbHeader);
+
+ assertEquals(RequestContext.get().getDifferentialEntities().size(), 1);
+ AtlasEntity diffEntity = RequestContext.get().getDifferentialEntity(updatedDbEntity.getGuid());
+ assertNotNull(diffEntity.getAttribute("description"));
+ Assert.assertNull(diffEntity.getAttribute("namespace"));
+ Assert.assertNull(diffEntity.getAttribute("name"));
+ ApplicationProperties.get().setProperty(STORE_DIFFERENTIAL_AUDITS.getPropertyName(), false);
+ }
+
+ @Test
public void testSetObjectIdAttrToNull() throws Exception {
final AtlasEntity dbEntity = TestUtilsV2.createDBEntity();
final AtlasEntity db2Entity = TestUtilsV2.createDBEntity();
@@ -1004,7 +1045,10 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
}
@Test (dependsOnMethods = "testCreate")
- public void addCustomAttributesToEntity() throws AtlasBaseException {
+ public void addCustomAttributesToEntity() throws Exception {
+
+ ApplicationProperties.get().setProperty(STORE_DIFFERENTIAL_AUDITS.getPropertyName(), true);
+ init();
AtlasEntity tblEntity = getEntityFromStore(tblEntityGuid);
Map<String, String> customAttributes = new HashMap<>();
@@ -1017,10 +1061,13 @@ public class AtlasEntityStoreV2Test extends AtlasEntityTestBase {
tblEntity.setCustomAttributes(customAttributes);
entityStore.createOrUpdate(new AtlasEntityStream(tblEntity), false);
+ assertEquals(RequestContext.get().getDifferentialEntities().size(), 1);
tblEntity = getEntityFromStore(tblEntityGuid);
assertEquals(customAttributes, tblEntity.getCustomAttributes());
+
+ ApplicationProperties.get().setProperty(STORE_DIFFERENTIAL_AUDITS.getPropertyName(), false);
}
@Test (dependsOnMethods = "addCustomAttributesToEntity")
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 216ba08..7de3536 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -53,6 +53,7 @@ public class RequestContext {
private final Map<String, AtlasEntityHeader> deletedEntities = new HashMap<>();
private final Map<String, AtlasEntity> entityCache = new HashMap<>();
private final Map<String, AtlasEntityWithExtInfo> entityExtInfoCache = new HashMap<>();
+ private final Map<String, AtlasEntity> diffEntityCache = new HashMap<>();
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
private final Map<String, List<AtlasClassification>> removedPropagations = new HashMap<>();
private final AtlasPerfMetrics metrics = isMetricsEnabled ? new AtlasPerfMetrics() : null;
@@ -114,6 +115,7 @@ public class RequestContext {
this.deletedEntities.clear();
this.entityCache.clear();
this.entityExtInfoCache.clear();
+ this.diffEntityCache.clear();
this.addedPropagations.clear();
this.removedPropagations.clear();
this.entitiesToSkipUpdate.clear();
@@ -344,6 +346,17 @@ public class RequestContext {
}
}
+ public void cacheDifferentialEntity(AtlasEntity entity) {
+ if (entity != null && entity.getGuid() != null) {
+ diffEntityCache.put(entity.getGuid(), entity);
+ }
+ }
+
+ public AtlasEntity getDifferentialEntity(String guid) {
+ return diffEntityCache.get(guid);
+ }
+
+ public Collection<AtlasEntity> getDifferentialEntities() { return diffEntityCache.values(); }
public Collection<AtlasEntityHeader> getUpdatedEntities() {
return updatedEntities.values();