You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2019/01/22 04:38:35 UTC
[atlas] branch master updated: ATLAS-3020: Audit APIs for
classification updates.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new cb2421b ATLAS-3020: Audit APIs for classification updates.
new cd51cef Merge branch 'master' of https://gitbox.apache.org/repos/asf/atlas
cb2421b is described below
commit cb2421bc0eb4f6a9c2d9f6dadf3b9ba0080e3e82
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Mon Jan 21 20:34:50 2019 -0800
ATLAS-3020: Audit APIs for classification updates.
---
.../main/java/org/apache/atlas/AtlasClientV2.java | 21 +-
.../atlas/model/instance/AtlasEntityHeaders.java | 56 ++++
.../audit/CassandraBasedAuditRepository.java | 7 +
.../repository/audit/EntityAuditRepository.java | 9 +
.../audit/HBaseBasedAuditRepository.java | 52 +++-
.../audit/InMemoryEntityAuditRepository.java | 17 ++
.../audit/NoopEntityAuditRepository.java | 7 +
.../repository/store/graph/AtlasEntityStore.java | 3 +
.../store/graph/v2/AtlasEntityStoreV2.java | 7 +
.../store/graph/v2/ClassificationAssociator.java | 316 +++++++++++++++++++++
.../store/graph/v2/EntityGraphRetriever.java | 2 +-
.../graph/v2/ClassificationAssociatorTest.java | 235 +++++++++++++++
.../col-entity-None.json | 10 +
.../col-entity-PII-FIN_PII.json | 32 +++
.../classification-association/col-entity-PII.json | 22 ++
.../col-entity-T1-prop-Tn.json | 34 +++
.../classification-association/header-FIN_PII.json | 32 +++
.../classification-association/header-None.json | 21 ++
.../header-PII-VENDOR_PII.json | 42 +++
.../classification-association/header-PII.json | 32 +++
.../header-Tx-prop-T1.json | 42 +++
.../json/classification-association/header-Tx.json | 26 ++
.../classification-association/header-empty.json | 3 +
.../java/org/apache/atlas/web/rest/EntityREST.java | 59 +++-
24 files changed, 1081 insertions(+), 6 deletions(-)
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 7c8caee..33466e5 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -27,7 +27,9 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.lineage.AtlasLineageInfo;
import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection;
@@ -72,6 +74,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
// Relationships APIs
private static final String RELATIONSHIPS_URI = BASE_URI + "v2/relationship/";
+ private static final String BULK_HEADERS = "bulk/headers";
+ private static final String BULK_SET_CLASSIFICATIONS = "bulk/setClassifications";
public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
super(baseUrl, basicAuthUserNamePassword);
@@ -326,13 +330,26 @@ public class AtlasClientV2 extends AtlasBaseClient {
}
public void deleteClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException {
- callAPI(formatPathParameters(API_V2.GET_CLASSIFICATIONS, guid), AtlasClassifications.class, classifications);
+ for (AtlasClassification c : classifications) {
+ callAPI(formatPathParameters(API_V2.DELETE_CLASSIFICATION, guid, c.getTypeName()), AtlasClassifications.class, classifications);
+ }
}
public void deleteClassification(String guid, String classificationName) throws AtlasServiceException {
callAPI(formatPathParameters(API_V2.DELETE_CLASSIFICATION, guid, classificationName), null, null);
}
+ public AtlasEntityHeaders getEntityHeaders(long tagUpdateStartTime) throws AtlasServiceException {
+ MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+ queryParams.add("tagUpdateStartTime", Long.toString(tagUpdateStartTime));
+
+ return callAPI(API_V2.GET_BULK_HEADERS, AtlasEntityHeaders.class, queryParams);
+ }
+
+ public String setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasServiceException {
+ return callAPI(API_V2.UPDATE_BULK_SET_CLASSIFICATIONS, String.class, entityHeaders);
+ }
+
/* Discovery calls */
public AtlasSearchResult dslSearch(final String query) throws AtlasServiceException {
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
@@ -480,6 +497,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
public static final API_V2 DELETE_RELATIONSHIP_BY_GUID = new API_V2(RELATIONSHIPS_URI + "guid/", HttpMethod.DELETE, Response.Status.NO_CONTENT);
public static final API_V2 CREATE_RELATIONSHIP = new API_V2(RELATIONSHIPS_URI , HttpMethod.POST, Response.Status.OK);
public static final API_V2 UPDATE_RELATIONSHIP = new API_V2(RELATIONSHIPS_URI , HttpMethod.PUT, Response.Status.OK);
+ public static final API_V2 GET_BULK_HEADERS = new API_V2(ENTITY_API + BULK_HEADERS, HttpMethod.GET, Response.Status.OK);
+ public static final API_V2 UPDATE_BULK_SET_CLASSIFICATIONS = new API_V2(ENTITY_API + AtlasClientV2.BULK_SET_CLASSIFICATIONS, HttpMethod.POST, Response.Status.OK);
private API_V2(String path, String method, Response.Status status) {
super(path, method, status);
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeaders.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeaders.java
new file mode 100644
index 0000000..11c6fc7
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeaders.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.instance;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import java.util.Map;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class AtlasEntityHeaders {
+ Map<String, AtlasEntityHeader> guidHeaderMap;
+
+ public AtlasEntityHeaders() {
+ }
+
+ public AtlasEntityHeaders(Map<String, AtlasEntityHeader> guidEntityHeaderMap) {
+ guidHeaderMap = guidEntityHeaderMap;
+ }
+
+ public void setGuidHeaderMap(Map<String, AtlasEntityHeader> guidHeaderMap) {
+ this.guidHeaderMap = guidHeaderMap;
+ }
+
+ public Map<String, AtlasEntityHeader> getGuidHeaderMap() {
+ return guidHeaderMap;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
index eb78f8f..b8131bd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
@@ -30,6 +30,7 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.inject.Singleton;
@@ -188,6 +190,11 @@ public class CassandraBasedAuditRepository extends AbstractStorageBasedAuditRepo
}
@Override
+ public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+ throw new NotImplementedException();
+ }
+
+ @Override
public void start() throws AtlasException {
initApplicationProperties();
initializeSettings();
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index aab2d5b..2a47e39 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -24,6 +24,7 @@ import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.exception.AtlasBaseException;
import java.util.List;
+import java.util.Set;
/**
* Interface for repository for storing entity audit events
@@ -77,6 +78,14 @@ public interface EntityAuditRepository {
*/
List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException;
+ /***
+ * List events for given time range where classifications have been added, deleted or updated.
+ * @param fromTimestamp from timestamp
+ * @param toTimestamp to timestamp
+ * @return events that match the range
+ * @throws AtlasBaseException
+ */
+ Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException;
/**
* List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 6f4415f..5f01293 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -50,7 +50,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -64,9 +67,10 @@ import javax.inject.Singleton;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD;
import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE;
@@ -546,6 +550,52 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
}
@Override
+ public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+ final String classificationUpdatesAction = "CLASSIFICATION_";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Listing events for fromTimestamp {}, toTimestamp {}, action {}", fromTimestamp, toTimestamp);
+ }
+
+ Table table = null;
+ ResultScanner scanner = null;
+
+ try {
+ Set<String> guids = new HashSet<>();
+
+ table = connection.getTable(tableName);
+
+ byte[] filterValue = Bytes.toBytes(classificationUpdatesAction);
+ BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator(filterValue);
+ SingleColumnValueFilter filter = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
+ Scan scan = new Scan().setFilter(filter).setTimeRange(fromTimestamp, toTimestamp);
+
+ Result result;
+ scanner = table.getScanner(scan);
+ while ((result = scanner.next()) != null) {
+ EntityAuditEvent event = fromKey(result.getRow());
+
+ if (event == null) {
+ continue;
+ }
+
+ guids.add(event.getEntityId());
+ }
+
+ return guids;
+ } catch (IOException e) {
+ throw new AtlasBaseException(e);
+ } finally {
+ try {
+ close(scanner);
+ close(table);
+ } catch (AtlasException e) {
+ throw new AtlasBaseException(e);
+ }
+ }
+ }
+
+ @Override
public void start() throws AtlasException {
Configuration configuration = ApplicationProperties.get();
startInternal(configuration, getHBaseConfiguration(configuration));
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
index dca3b85..ad6ec94 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -21,6 +21,7 @@ package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
+import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
@@ -28,7 +29,9 @@ import org.springframework.stereotype.Component;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -119,6 +122,20 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
}
@Override
+ public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+ Set<String> events = new HashSet<>();
+
+ for (EntityAuditEventV2 event : auditEventsV2.values()) {
+ long timestamp = event.getTimestamp();
+ if (timestamp > fromTimestamp && timestamp <= toTimestamp) {
+ events.add(event.getEntityId());
+ }
+ }
+
+ return events;
+ }
+
+ @Override
public List<Object> listEvents(String entityId, String startKey, short maxResults) {
List events = listEventsV2(entityId, startKey, maxResults);
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
index e3a6078..4bb68d5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
@@ -20,12 +20,14 @@ package org.apache.atlas.repository.audit;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
+import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.springframework.stereotype.Component;
import javax.inject.Singleton;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
/**
* Implementation that completely disables the audit repository.
@@ -66,6 +68,11 @@ public class NoopEntityAuditRepository implements EntityAuditRepository {
}
@Override
+ public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+ return Collections.emptySet();
+ }
+
+ @Override
public List<Object> listEvents(String entityId, String startKey, short n) {
return Collections.emptyList();
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 750fa17..1da1138 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -24,6 +24,7 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v2.EntityStream;
@@ -216,4 +217,6 @@ public interface AtlasEntityStore {
List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException;
AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException;
+
+ String setClassifications(AtlasEntityHeaders entityHeaders);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 35aa3af..e8e5400 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -657,6 +657,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
return ret;
}
+ @Override
+ @GraphTransaction
+ public String setClassifications(AtlasEntityHeaders entityHeaders) {
+ ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(typeRegistry, this);
+ return associator.setClassifications(entityHeaders.getGuidHeaderMap());
+ }
+
private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createOrUpdate()");
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java
new file mode 100644
index 0000000..11d008f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Component
+public class ClassificationAssociator {
+ private static final Logger LOG = LoggerFactory.getLogger(ClassificationAssociator.class);
+
+ public static class Retriever {
+ private final EntityAuditRepository auditRepository;
+ private final EntityGraphRetriever entityRetriever;
+
+ public Retriever(AtlasTypeRegistry typeRegistry, EntityAuditRepository auditRepository) {
+ this.entityRetriever = new EntityGraphRetriever(typeRegistry);
+ this.auditRepository = auditRepository;
+ }
+
+ Retriever(EntityGraphRetriever entityGraphRetriever, EntityAuditRepository auditRepository) {
+ this.entityRetriever = entityGraphRetriever;
+ this.auditRepository = auditRepository;
+ }
+
+ public AtlasEntityHeaders get(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
+ toTimestamp = incrementTimestamp(toTimestamp);
+ Set<String> guids = auditRepository.getEntitiesWithTagChanges(fromTimestamp, toTimestamp);
+
+ Map<String, AtlasEntityHeader> guidEntityHeaderMap = new HashMap<>();
+ for (String guid : guids) {
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+
+ guidEntityHeaderMap.put(guid, entityHeader);
+ }
+
+ guids.clear();
+ return new AtlasEntityHeaders(guidEntityHeaderMap);
+ }
+
+ private long incrementTimestamp(long t) {
+ return t + 1;
+ }
+ }
+
+ public static class Updater {
+ static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName";
+ static final String STATUS_DONE = "(Done)";
+ static final String STATUS_SKIPPED = "(Skipped)";
+ static final String STATUS_PARTIAL = "(Partial)";
+
+ private static final String PROCESS_FORMAT = "%s:%s:%s:%s -> %s:%s";
+ static final String PROCESS_ADD = "Add";
+ static final String PROCESS_UPDATE = "Update";
+ static final String PROCESS_DELETE = "Delete";
+ static final String JSONIFY_STRING_FORMAT = "\"%s\",";
+
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasEntityStore entitiesStore;
+ private final EntityGraphRetriever entityRetriever;
+ private StringBuilder actionSummary = new StringBuilder();
+
+ public Updater(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
+ this.typeRegistry = typeRegistry;
+ this.entitiesStore = entitiesStore;
+ entityRetriever = new EntityGraphRetriever(typeRegistry);
+ }
+
+ public String setClassifications(Map<String, AtlasEntityHeader> map) {
+ for (AtlasEntityHeader incomingEntityHeader : map.values()) {
+ String typeName = incomingEntityHeader.getTypeName();
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+ if (entityType == null) {
+ LOG.warn("Entity type: {}: Not found: {}!", typeName, STATUS_SKIPPED);
+ summarizeFormat("%s: %s", typeName, STATUS_SKIPPED);
+ continue;
+ }
+
+ String qualifiedName = getQualifiedName(incomingEntityHeader);
+ AtlasEntityHeader entityToBeChanged = getByUniqueAttributes(entityType, qualifiedName, incomingEntityHeader.getAttributes());
+ if (entityToBeChanged == null) {
+ summarizeFormat("Entity:%s:%s:[Not found]:%s", entityType.getTypeName(), qualifiedName, STATUS_SKIPPED);
+ continue;
+ }
+
+
+ String guid = entityToBeChanged.getGuid();
+ Map<String, List<AtlasClassification>> operationListMap = computeChanges(incomingEntityHeader, entityToBeChanged);
+ commitChanges(guid, typeName, qualifiedName, operationListMap);
+ }
+
+ return getJsonArray(actionSummary);
+ }
+
+ private void commitChanges(String entityGuid, String typeName, String qualifiedName,
+ Map<String, List<AtlasClassification>> operationListMap) {
+ if (MapUtils.isEmpty(operationListMap)) {
+ return;
+ }
+
+ deleteClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_DELETE));
+ updateClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_UPDATE));
+ addClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_ADD));
+
+ operationListMap.clear();
+ }
+
+ private Map<String, List<AtlasClassification>> computeChanges(AtlasEntityHeader incomingEntityHeader, AtlasEntityHeader entityToBeUpdated) {
+ if (incomingEntityHeader == null || entityToBeUpdated == null) {
+ return null;
+ }
+
+ ListOps<AtlasClassification> listOps = new ListOps<>();
+ List<AtlasClassification> incomingClassifications = listOps.filter(incomingEntityHeader.getGuid(), incomingEntityHeader.getClassifications());
+ List<AtlasClassification> entityClassifications = listOps.filter(entityToBeUpdated.getGuid(), entityToBeUpdated.getClassifications());
+
+ if (CollectionUtils.isEmpty(incomingClassifications) && CollectionUtils.isEmpty(entityClassifications)) {
+ return null;
+ }
+
+ Map<String, List<AtlasClassification>> operationListMap = new HashMap<>();
+
+ bucket(PROCESS_DELETE, operationListMap, listOps.subtract(entityClassifications, incomingClassifications));
+ bucket(PROCESS_UPDATE, operationListMap, listOps.intersect(incomingClassifications, entityClassifications));
+ bucket(PROCESS_ADD, operationListMap, listOps.subtract(incomingClassifications, entityClassifications));
+
+ return operationListMap;
+ }
+
+ private void bucket(String op, Map<String, List<AtlasClassification>> operationListMap, List<AtlasClassification> results) {
+ if (CollectionUtils.isEmpty(results)) {
+ return;
+ }
+
+ operationListMap.put(op, results);
+ }
+
+ private void addClassifications(String entityToBeChangedGuid, String typeName, String qualifiedName, List<AtlasClassification> list) {
+ if (CollectionUtils.isEmpty(list)) {
+ return;
+ }
+
+ String status = STATUS_DONE;
+ String classificationNames = getClassificationNames(list);
+ try {
+ entitiesStore.addClassifications(entityToBeChangedGuid, list);
+ } catch (AtlasBaseException e) {
+ status = STATUS_PARTIAL;
+ LOG.warn("{}:{}:{} -> {}: {}.", PROCESS_UPDATE, typeName, qualifiedName, classificationNames, status);
+ }
+
+ summarize(PROCESS_ADD, entityToBeChangedGuid, typeName, qualifiedName, classificationNames, status);
+ }
+
+ private void updateClassifications(String entityToBeChangedGuid, String typeName, String qualifiedName, List<AtlasClassification> list) {
+ if (CollectionUtils.isEmpty(list)) {
+ return;
+ }
+
+ String status = STATUS_DONE;
+ String classificationNames = getClassificationNames(list);
+
+ try {
+ entitiesStore.updateClassifications(entityToBeChangedGuid, list);
+ } catch (AtlasBaseException e) {
+ status = STATUS_PARTIAL;
+ LOG.warn("{}:{}:{} -> {}: {}.", PROCESS_UPDATE, typeName, qualifiedName, classificationNames, status);
+ }
+
+ summarize(PROCESS_UPDATE, entityToBeChangedGuid, typeName, qualifiedName, classificationNames, status);
+ }
+
+ private void deleteClassifications(String typeName, String entityGuid, String qualifiedName, List<AtlasClassification> list) {
+ if (CollectionUtils.isEmpty(list)) {
+ return;
+ }
+
+ String status = STATUS_DONE;
+ String classificationTypeName = getClassificationNames(list);
+ for (AtlasClassification c : list) {
+ try {
+ entitiesStore.deleteClassification(entityGuid, c.getTypeName());
+ } catch (AtlasBaseException e) {
+ status = STATUS_PARTIAL;
+ LOG.warn("{}:{}:{} -> {}: Skipped!", entityGuid, typeName, qualifiedName, c.getTypeName());
+ }
+ }
+
+ summarize(PROCESS_DELETE, entityGuid, typeName, qualifiedName, classificationTypeName, status);
+ }
+
+ AtlasEntityHeader getByUniqueAttributes(AtlasEntityType entityType, String qualifiedName, Map<String, Object> attrValues) {
+ try {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, attrValues);
+ if (vertex == null) {
+ return null;
+ }
+
+ return entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
+ } catch (AtlasBaseException e) {
+ LOG.warn("{}:{} could not be processed!", entityType, qualifiedName);
+ return null;
+ } catch (Exception ex) {
+ LOG.error("{}:{} could not be processed!", entityType, qualifiedName, ex);
+ return null;
+ }
+ }
+
+ private String getClassificationNames(List<AtlasClassification> list) {
+ return list.stream().map(AtlasClassification::getTypeName).collect(Collectors.joining(", "));
+ }
+
+ private String getQualifiedName(AtlasEntityHeader entityHeader) {
+ return (String) entityHeader.getAttribute(ATTR_NAME_QUALIFIED_NAME);
+ }
+
+ private void summarize(String... s) {
+ summarizeFormat(PROCESS_FORMAT, s);
+ }
+
+ private void summarizeFormat(String format, String... s) {
+ summarize(String.format(format, s));
+ }
+
+ private void summarize(String s) {
+ actionSummary.append(String.format(JSONIFY_STRING_FORMAT, s));
+ }
+
+ private String getJsonArray(StringBuilder actionSummary) {
+ return "[" + StringUtils.removeEnd(actionSummary.toString(), ",") + "]";
+ }
+ }
+
+ private static class ListOps<V extends AtlasClassification> {
+ public List<V> intersect(List<V> lhs, List<V> rhs) {
+ if (CollectionUtils.isEmpty(rhs)) {
+ return null;
+ }
+
+ List<V> result = new ArrayList<>();
+ for (V c : rhs) {
+ V found = findFrom(lhs, c);
+ if (found != null) {
+ result.add(found);
+ }
+ }
+
+ return result;
+ }
+
+ public List<V> subtract(List<V> lhs, List<V> rhs) {
+ if (CollectionUtils.isEmpty(lhs)) {
+ return null;
+ }
+
+ List<V> result = new ArrayList<>();
+ for (V c : lhs) {
+ V found = findFrom(rhs, c);
+ if (found == null) {
+ result.add(c);
+ }
+ }
+
+ return result;
+ }
+
+ private V findFrom(List<V> reference, V check) {
+ return (V) CollectionUtils.find(reference, ox ->
+ ((V) ox).getTypeName().equals(check.getTypeName()));
+ }
+
+ public List<V> filter(String guid, List<V> list) {
+ if (CollectionUtils.isEmpty(list)) {
+ return list;
+ }
+
+ return list.stream().filter(x -> x.getEntityGuid().equals(guid)).collect(Collectors.toList());
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 066abc1..79216d5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -95,7 +95,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
@Component
-public final class EntityGraphRetriever {
+public class EntityGraphRetriever {
private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class);
private static final String TERM_RELATION_NAME = "AtlasGlossarySemanticAssignment";
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociatorTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociatorTest.java
new file mode 100644
index 0000000..ab5bb2b
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociatorTest.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_ADD;
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_DELETE;
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_UPDATE;
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.STATUS_DONE;
+import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.STATUS_SKIPPED;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.FileAssert.fail;
+
+public class ClassificationAssociatorTest {
+ private static final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8";
+ private static String TEST_FILES_SUBDIR = "classification-association";
+ private static String MESSAGE_SEPARATOR = ":";
+ private static String ENTITY_NAME_SEPARATOR = "->";
+
+ private class ClassificationAssociatorUpdaterForSpy extends ClassificationAssociator.Updater {
+ private final String entityFileName;
+
+ public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) {
+ super(typeRegistry, entitiesStore);
+ this.entityFileName = StringUtils.EMPTY;
+ }
+
+ public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore, String entityFileName) {
+ super(typeRegistry, entitiesStore);
+ this.entityFileName = entityFileName;
+ }
+
+ @Override
+ AtlasEntityHeader getByUniqueAttributes(AtlasEntityType entityType, String qualifiedName, Map<String, Object> attrValues) {
+ try {
+ if (StringUtils.isEmpty(entityFileName)) {
+ return null;
+ }
+
+ return getEntityHeaderFromFile(entityFileName);
+ } catch (IOException e) {
+ fail(entityFileName + " could not be loaded.");
+ return null;
+ }
+ }
+ }
+
+ @Test
+ public void auditScanYieldsNothing_EmptyHeadersReturned() {
+ AtlasEntityHeaders actualEntityHeaders = setupRetriever("header-empty", 0, 0, null);
+
+ assertNotNull(actualEntityHeaders);
+ assertEquals(actualEntityHeaders.getGuidHeaderMap().size(),0);
+ }
+
+ @Test
+ public void auditScanYieldsOneEntity_EntityHeadersHasOneElementWithClassification() {
+ AtlasEntityHeaders actualEntityHeaders = setupRetriever("header-Tx", 0, 0, TABLE_GUID);
+
+ assertNotNull(actualEntityHeaders);
+ assertEquals(actualEntityHeaders.getGuidHeaderMap().size(), 1);
+ assertTrue(actualEntityHeaders.getGuidHeaderMap().containsKey(TABLE_GUID));
+ assertEquals(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getGuid(), TABLE_GUID);
+ assertNotNull(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getClassifications());
+ assertEquals(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getClassifications().size(), 1);
+ }
+
+ private AtlasEntityHeaders setupRetriever(String headersFile, int fromTimestamp, int toTimestamp, final String tableGuid) {
+ AtlasEntityHeader entityHeaderWithClassification = null;
+ try {
+ Set<String> guids = new HashSet<String>();
+ entityHeaderWithClassification = TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, headersFile, AtlasEntityHeader.class);
+ if (!StringUtils.isEmpty(tableGuid)) {
+ guids.add(tableGuid);
+ }
+
+ EntityAuditRepository auditRepository = mock(EntityAuditRepository.class);
+ when(auditRepository.getEntitiesWithTagChanges(anyLong(), anyLong())).thenReturn(guids);
+
+ EntityGraphRetriever entityGraphRetriever = mock(EntityGraphRetriever.class);
+ when(entityGraphRetriever.toAtlasEntityHeaderWithClassifications(TABLE_GUID)).thenReturn(entityHeaderWithClassification);
+
+ ClassificationAssociator.Retriever retriever = new ClassificationAssociator.Retriever(entityGraphRetriever, auditRepository);
+ return retriever.get(fromTimestamp, toTimestamp);
+ }
+ catch (Exception ex) {
+ fail("Exception!");
+ return null;
+ }
+ }
+
+ @Test
+ public void updaterIncorrectType_ReturnsError() throws IOException {
+ AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile("header-PII");
+ AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
+
+ AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
+ when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(null);
+
+ ClassificationAssociator.Updater updater = new ClassificationAssociator.Updater(typeRegistry, entitiesStore);
+ String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
+
+ assertTrue(summary.contains("hive_"));
+ assertTrue(summary.contains(STATUS_SKIPPED));
+ }
+
+ @Test
+ public void updaterCorrectTypeEntityNotFound_Skipped() throws IOException {
+ AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile("header-PII");
+ AtlasEntityType hiveTable = mock(AtlasEntityType.class);
+ AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
+ AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
+
+ when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable);
+ when(hiveTable.getTypeName()).thenReturn("hive_column");
+
+ ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore);
+ String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
+
+ TypeReference<String[]> typeReference = new TypeReference<String[]>() {};
+ String[] summaryArray = AtlasJson.fromJson(summary, typeReference);
+ assertEquals(summaryArray.length, 1);
+ assertSummaryElement(summaryArray[0], "Entity", STATUS_SKIPPED, "");
+ }
+
+ @Test
+ public void updaterTests() throws IOException {
+ updaterAssert("header-PII", "col-entity-None", PROCESS_ADD + ":PII");
+ updaterAssert("header-PII", "col-entity-PII", new String[]{PROCESS_UPDATE + ":PII"});
+ updaterAssert("header-None", "col-entity-PII", new String[]{PROCESS_DELETE + ":PII"});
+ updaterAssert("header-PII-VENDOR_PII", "col-entity-PII-FIN_PII",
+ PROCESS_DELETE + ":FIN_PII",
+ PROCESS_UPDATE + ":PII",
+ PROCESS_ADD + ":VENDOR_PII");
+ updaterAssert("header-None", "col-entity-None", new String[]{});
+ updaterAssert("header-FIN_PII", "col-entity-PII",
+ PROCESS_DELETE + ":PII",
+ PROCESS_ADD + ":FIN_PII");
+ }
+
+ @Test
+ public void updater_filterPropagatedClassifications() throws IOException {
+ updaterAssert("header-Tx-prop-T1", "col-entity-T1-prop-Tn",
+ PROCESS_DELETE + ":T1",
+ PROCESS_ADD + ":Tx");
+ }
+
+
+ private void assertSummaryElement(String summaryElement, String operation, String status, String classificationName) {
+ String[] splits = StringUtils.split(summaryElement, MESSAGE_SEPARATOR);
+ String[] nameSplits = StringUtils.split(splits[3], ENTITY_NAME_SEPARATOR);
+ if (nameSplits.length > 1) {
+ assertEquals(nameSplits[1].trim(), classificationName);
+ }
+
+ assertEquals(splits[0], operation);
+ assertEquals(splits[4], status);
+ }
+
+ private String[] setupUpdater(String entityHeaderFileName, String entityFileName, int expectedSummaryLength) throws IOException {
+ AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile(entityHeaderFileName);
+
+ AtlasEntityType hiveTable = mock(AtlasEntityType.class);
+ AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class);
+ AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class);
+
+ when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable);
+ when(hiveTable.getTypeName()).thenReturn("hive_column");
+
+ ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore, entityFileName);
+ String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap());
+
+ TypeReference<String[]> typeReference = new TypeReference<String[]>() {};
+ String[] summaryArray = AtlasJson.fromJson(summary, typeReference);
+ assertEquals(summaryArray.length, expectedSummaryLength);
+
+ return summaryArray;
+ }
+
+ private AtlasEntityHeader getEntityHeaderFromFile(String entityJson) throws IOException {
+ return TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, entityJson, AtlasEntityHeader.class);
+ }
+
+ private AtlasEntityHeaders getEntityHeaderMapFromFile(String filename) throws IOException {
+ return TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, filename, AtlasEntityHeaders.class);
+ }
+
+ private void updaterAssert(String incoming, String entity, String... opNamePair) throws IOException {
+ String[] summary = setupUpdater(incoming, entity, opNamePair.length);
+
+ for (int i = 0; i < opNamePair.length; i++) {
+ String s = opNamePair[i];
+ String[] splits = StringUtils.split(s, ":");
+ assertSummaryElement(summary[i], splits[0], STATUS_DONE, splits[1]);
+ }
+ }
+}
diff --git a/repository/src/test/resources/json/classification-association/col-entity-None.json b/repository/src/test/resources/json/classification-association/col-entity-None.json
new file mode 100644
index 0000000..6c04a3d
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/col-entity-None.json
@@ -0,0 +1,10 @@
+{
+ "typeName": "hive_column",
+ "attributes": {
+ "owner": "hive",
+ "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+ "name": "nationalid"
+ },
+ "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "status": "ACTIVE"
+}
diff --git a/repository/src/test/resources/json/classification-association/col-entity-PII-FIN_PII.json b/repository/src/test/resources/json/classification-association/col-entity-PII-FIN_PII.json
new file mode 100644
index 0000000..283d863
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/col-entity-PII-FIN_PII.json
@@ -0,0 +1,32 @@
+{
+ "typeName": "hive_column",
+ "attributes": {
+ "owner": "hive",
+ "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+ "name": "nationalid"
+ },
+ "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "status": "ACTIVE",
+ "classifications": [
+ {
+ "typeName": "PII",
+ "attributes": {
+ "type": "ssn"
+ },
+ "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "entityStatus": "ACTIVE",
+ "propagate": true,
+ "removePropagationsOnEntityDelete": false
+ },
+ {
+ "typeName": "FIN_PII",
+ "attributes": {
+ "type": "ssn"
+ },
+ "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "entityStatus": "ACTIVE",
+ "propagate": true,
+ "removePropagationsOnEntityDelete": false
+ }
+ ]
+}
diff --git a/repository/src/test/resources/json/classification-association/col-entity-PII.json b/repository/src/test/resources/json/classification-association/col-entity-PII.json
new file mode 100644
index 0000000..af34e7d
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/col-entity-PII.json
@@ -0,0 +1,22 @@
+{
+ "typeName": "hive_column",
+ "attributes": {
+ "owner": "hive",
+ "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+ "name": "nationalid"
+ },
+ "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "status": "ACTIVE",
+ "classifications": [
+ {
+ "typeName": "PII",
+ "attributes": {
+ "type": "ssn"
+ },
+ "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "entityStatus": "ACTIVE",
+ "propagate": true,
+ "removePropagationsOnEntityDelete": false
+ }
+ ]
+}
diff --git a/repository/src/test/resources/json/classification-association/col-entity-T1-prop-Tn.json b/repository/src/test/resources/json/classification-association/col-entity-T1-prop-Tn.json
new file mode 100644
index 0000000..4f9cbd2
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/col-entity-T1-prop-Tn.json
@@ -0,0 +1,34 @@
+{
+ "typeName": "hive_column",
+ "attributes": {
+ "owner": "hive",
+ "createTime": 1547071410000,
+ "qualifiedName": "stocks.daily@cl1",
+ "name": "daily"
+ },
+ "guid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
+ "status": "ACTIVE",
+ "displayText": "daily",
+ "classifications": [
+ {
+ "typeName": "T1",
+ "attributes": {},
+ "entityGuid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
+ "entityStatus": "ACTIVE",
+ "propagate": false,
+ "validityPeriods": [],
+ "removePropagationsOnEntityDelete": false
+ },
+ {
+ "typeName": "Tn",
+ "attributes": {},
+ "entityGuid": "22222222-5555-40f8-a30f-3090b8a622f8",
+ "entityStatus": "ACTIVE",
+ "propagate": false,
+ "validityPeriods": [],
+ "removePropagationsOnEntityDelete": false
+ }
+ ],
+ "meaningNames": [],
+ "meanings": []
+}
diff --git a/repository/src/test/resources/json/classification-association/header-FIN_PII.json b/repository/src/test/resources/json/classification-association/header-FIN_PII.json
new file mode 100644
index 0000000..35bb6db
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-FIN_PII.json
@@ -0,0 +1,32 @@
+{
+ "guidHeaderMap": {
+ "0ce68113-77fe-4ed1-9585-69371202bd74": {
+ "typeName": "hive_column",
+ "attributes": {
+ "owner": "hive",
+ "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+ "name": "nationalid"
+ },
+ "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "status": "ACTIVE",
+ "displayText": "nationalid",
+ "classificationNames": [
+ "FIN_PII"
+ ],
+ "classifications": [
+ {
+ "typeName": "FIN_PII",
+ "attributes": {
+ "type": "ssn"
+ },
+ "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "entityStatus": "ACTIVE",
+ "propagate": true,
+ "removePropagationsOnEntityDelete": false
+ }
+ ],
+ "meaningNames": [],
+ "meanings": []
+ }
+ }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-None.json b/repository/src/test/resources/json/classification-association/header-None.json
new file mode 100644
index 0000000..a858990
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-None.json
@@ -0,0 +1,21 @@
+{
+ "guidHeaderMap": {
+ "0ce68113-77fe-4ed1-9585-69371202bd74": {
+ "typeName": "hive_column",
+ "attributes": {
+ "owner": "hive",
+ "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+ "name": "nationalid"
+ },
+ "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "status": "ACTIVE",
+ "displayText": "nationalid",
+ "classificationNames": [
+ ],
+ "classifications": [
+ ],
+ "meaningNames": [],
+ "meanings": []
+ }
+ }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-PII-VENDOR_PII.json b/repository/src/test/resources/json/classification-association/header-PII-VENDOR_PII.json
new file mode 100644
index 0000000..58638f7
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-PII-VENDOR_PII.json
@@ -0,0 +1,42 @@
+{
+ "guidHeaderMap": {
+ "0ce68113-77fe-4ed1-9585-69371202bd74": {
+ "typeName": "hive_column",
+ "attributes": {
+ "owner": "hive",
+ "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+ "name": "nationalid"
+ },
+ "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "status": "ACTIVE",
+ "displayText": "nationalid",
+ "classificationNames": [
+ "PII"
+ ],
+ "classifications": [
+ {
+ "typeName": "PII",
+ "attributes": {
+ "type": "ssn"
+ },
+ "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "entityStatus": "ACTIVE",
+ "propagate": true,
+ "removePropagationsOnEntityDelete": false
+ },
+ {
+ "typeName": "VENDOR_PII",
+ "attributes": {
+ "type": "ssn"
+ },
+ "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "entityStatus": "ACTIVE",
+ "propagate": true,
+ "removePropagationsOnEntityDelete": false
+ }
+ ],
+ "meaningNames": [],
+ "meanings": []
+ }
+ }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-PII.json b/repository/src/test/resources/json/classification-association/header-PII.json
new file mode 100644
index 0000000..bfc6d2e
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-PII.json
@@ -0,0 +1,32 @@
+{
+ "guidHeaderMap": {
+ "0ce68113-77fe-4ed1-9585-69371202bd74": {
+ "typeName": "hive_column",
+ "attributes": {
+ "owner": "hive",
+ "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+ "name": "nationalid"
+ },
+ "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "status": "ACTIVE",
+ "displayText": "nationalid",
+ "classificationNames": [
+ "PII"
+ ],
+ "classifications": [
+ {
+ "typeName": "PII",
+ "attributes": {
+ "type": "ssn"
+ },
+ "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "entityStatus": "ACTIVE",
+ "propagate": true,
+ "removePropagationsOnEntityDelete": false
+ }
+ ],
+ "meaningNames": [],
+ "meanings": []
+ }
+ }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-Tx-prop-T1.json b/repository/src/test/resources/json/classification-association/header-Tx-prop-T1.json
new file mode 100644
index 0000000..8f2f26f
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-Tx-prop-T1.json
@@ -0,0 +1,42 @@
+{
+ "guidHeaderMap": {
+ "0ce68113-77fe-4ed1-9585-69371202bd74": {
+ "typeName": "hive_column",
+ "attributes": {
+ "owner": "hive",
+ "qualifiedName": "hortoniabank.us_customers.nationalid@cl1",
+ "name": "nationalid"
+ },
+ "guid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "status": "ACTIVE",
+ "displayText": "nationalid",
+ "classificationNames": [
+ "T1", "Tx"
+ ],
+ "classifications": [
+ {
+ "typeName": "Tx",
+ "attributes": {
+ "type": "ssn"
+ },
+ "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74",
+ "entityStatus": "ACTIVE",
+ "propagate": true,
+ "removePropagationsOnEntityDelete": false
+ },
+ {
+ "typeName": "T1",
+ "attributes": {
+ "type": "ssn"
+ },
+ "entityGuid": "22222222-77fe-4ed1-9585-69371202bd74",
+ "entityStatus": "ACTIVE",
+ "propagate": true,
+ "removePropagationsOnEntityDelete": false
+ }
+ ],
+ "meaningNames": [],
+ "meanings": []
+ }
+ }
+}
diff --git a/repository/src/test/resources/json/classification-association/header-Tx.json b/repository/src/test/resources/json/classification-association/header-Tx.json
new file mode 100644
index 0000000..bab1eaa
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-Tx.json
@@ -0,0 +1,26 @@
+{
+ "typeName": "hive_table",
+ "attributes": {
+ "owner": "hive",
+ "createTime": 1547071410000,
+ "qualifiedName": "stocks.daily@cl1",
+ "name": "daily"
+ },
+ "guid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
+ "status": "ACTIVE",
+ "displayText": "daily",
+ "classificationNames": [],
+ "classifications": [
+ {
+ "typeName": "Tx",
+ "attributes": {},
+ "entityGuid": "df122fc3-5555-40f8-a30f-3090b8a622f8",
+ "entityStatus": "ACTIVE",
+ "propagate": false,
+ "validityPeriods": [],
+ "removePropagationsOnEntityDelete": false
+ }
+ ],
+ "meaningNames": [],
+ "meanings": []
+}
\ No newline at end of file
diff --git a/repository/src/test/resources/json/classification-association/header-empty.json b/repository/src/test/resources/json/classification-association/header-empty.json
new file mode 100644
index 0000000..9c8f417
--- /dev/null
+++ b/repository/src/test/resources/json/classification-association/header-empty.json
@@ -0,0 +1,3 @@
+{
+ "guidHeaderMap": {}
+}
\ No newline at end of file
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
index 68c132c..713338e 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
@@ -26,10 +26,12 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.instance.ClassificationAssociateRequest;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.store.graph.v2.ClassificationAssociator;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
@@ -49,7 +51,16 @@ import org.springframework.stereotype.Service;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.util.ArrayList;
@@ -77,7 +88,6 @@ public class EntityREST {
private final EntityAuditRepository auditRepository;
private final AtlasInstanceConverter instanceConverter;
-
@Inject
public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore,
EntityAuditRepository auditRepository, AtlasInstanceConverter instanceConverter) {
@@ -601,7 +611,7 @@ public class EntityREST {
}
/**
- * Bulk API to create new entities or update existing entities in Atlas.
+ * Bulk API to create new entities or updates existing entities in Atlas.
* Existing entity is matched using its unique guid if supplied or by its unique attributes eg: qualifiedName
*/
@POST
@@ -708,6 +718,49 @@ public class EntityREST {
}
}
+ @GET
+ @Path("bulk/headers")
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public AtlasEntityHeaders getEntityHeaders(@QueryParam("tagUpdateStartTime") long tagUpdateStartTime) throws AtlasBaseException {
+ AtlasPerfTracer perf = null;
+
+ try {
+ long tagUpdateEndTime = System.currentTimeMillis();
+
+ if (tagUpdateStartTime > tagUpdateEndTime) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "fromTimestamp should be less than toTimestamp");
+ }
+
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.getEntityHeaders(" + tagUpdateStartTime + ", " + tagUpdateEndTime + ")");
+ }
+
+ ClassificationAssociator.Retriever associator = new ClassificationAssociator.Retriever(typeRegistry, auditRepository);
+ return associator.get(tagUpdateStartTime, tagUpdateEndTime);
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
+ @POST
+ @Path("bulk/setClassifications")
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ public String setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasBaseException {
+ AtlasPerfTracer perf = null;
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.setClassifications()");
+ }
+
+ ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(typeRegistry, entitiesStore);
+ return associator.setClassifications(entityHeaders.getGuidHeaderMap());
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
private AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException {
AtlasEntityType ret = typeRegistry.getEntityTypeByName(typeName);