You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/01/19 18:02:20 UTC
[atlas] branch master updated: ATLAS-4094: Add support for column
sort in Entity Audit API
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new e6ee5b4 ATLAS-4094: Add support for column sort in Entity Audit API
e6ee5b4 is described below
commit e6ee5b43ee48f8d78ff84e222f50f36df4745c81
Author: Deep Singh <de...@gmail.com>
AuthorDate: Tue Jan 19 10:45:29 2021 -0600
ATLAS-4094: Add support for column sort in Entity Audit API
Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
.../atlas/model/audit/EntityAuditEventV2.java | 60 +++++++++++
.../audit/CassandraBasedAuditRepository.java | 5 +
.../repository/audit/EntityAuditRepository.java | 13 +++
.../audit/HBaseBasedAuditRepository.java | 117 ++++++++++++++++++++-
.../audit/InMemoryEntityAuditRepository.java | 16 +++
.../audit/NoopEntityAuditRepository.java | 5 +
.../repository/audit/AuditRepositoryTestBase.java | 33 ++++++
.../java/org/apache/atlas/web/rest/EntityREST.java | 9 +-
8 files changed, 255 insertions(+), 3 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
index 083acac..3afd27e 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -31,6 +31,8 @@ import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
import java.util.Objects;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
@@ -46,6 +48,10 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.EN
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class EntityAuditEventV2 implements Serializable, Clearable {
+ public static final String SORT_COLUMN_USER = "user";
+ public static final String SORT_COLUMN_ACTION = "action";
+ public static final String SORT_COLUMN_TIMESTAMP = "timestamp";
+
public enum EntityAuditType { ENTITY_AUDIT_V1, ENTITY_AUDIT_V2 }
public enum EntityAuditActionV2 {
@@ -282,4 +288,58 @@ public class EntityAuditEventV2 implements Serializable, Clearable {
return ret;
}
+
+ public static class UserComparator implements Comparator<EntityAuditEventV2> {
+ @Override
+ public int compare(EntityAuditEventV2 me, EntityAuditEventV2 other) {
+ int ret = String.CASE_INSENSITIVE_ORDER.compare(me.getUser(), other.getUser());
+
+ if (ret == 0) {
+ ret = Long.compare(me.getTimestamp(), other.getTimestamp());
+ }
+
+ return ret;
+ }
+ }
+
+ public static class ActionComparator implements Comparator<EntityAuditEventV2> {
+ @Override
+ public int compare(EntityAuditEventV2 me, EntityAuditEventV2 other) {
+ int ret = String.CASE_INSENSITIVE_ORDER.compare(me.getAction().toString(), other.getAction().toString());
+
+ if (ret == 0) {
+ ret = Long.compare(me.getTimestamp(), other.getTimestamp());
+ }
+
+ return ret;
+ }
+ }
+
+ public static class TimestampComparator implements Comparator<EntityAuditEventV2> {
+ @Override
+ public int compare(EntityAuditEventV2 me, EntityAuditEventV2 other) {
+ return Long.compare(me.getTimestamp(), other.getTimestamp());
+ }
+ }
+
+ public static void sortEvents(List<EntityAuditEventV2> events, String sortByColumn, boolean sortOrderDesc) {
+ final Comparator<EntityAuditEventV2> comparator;
+
+ switch (sortByColumn.toLowerCase()) {
+ case SORT_COLUMN_USER:
+ comparator = new EntityAuditEventV2.UserComparator();
+ break;
+
+ case SORT_COLUMN_ACTION:
+ comparator = new EntityAuditEventV2.ActionComparator();
+ break;
+
+ case SORT_COLUMN_TIMESTAMP:
+ default:
+ comparator = new EntityAuditEventV2.TimestampComparator();
+ break;
+ }
+
+ events.sort(sortOrderDesc ? comparator.reversed() : comparator);
+ }
}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
index 8a453fd..fb890fc 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
@@ -190,6 +190,11 @@ public class CassandraBasedAuditRepository extends AbstractStorageBasedAuditRepo
}
@Override
+ public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+ throw new NotImplementedException();
+ }
+
+ @Override
public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
throw new NotImplementedException();
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index 07784d1..bad2a89 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -79,6 +79,19 @@ public interface EntityAuditRepository {
*/
List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) throws AtlasBaseException;
+ /**
+ * List events for the given entity id in sorted order of given column. Returns n results
+ * @param entityId entity id
+ * @param auditAction operation to be used for search at HBase column
+ * @param sortByColumn name of column on which sorting is required
+ * @param sortOrderDesc flag to set sort order descending
+ * @param offset event list is truncated by removing offset number of items, used for pagination
+ * @param limit Max numbers of events to be returned
+ * @return list of events
+ * @throws AtlasBaseException
+ */
+ List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException;
+
/***
* List events for given time range where classifications have been added, deleted or updated.
* @param fromTimestamp from timestamp
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 9fca744..b061761 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -22,11 +22,13 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
+import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -226,11 +229,12 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
}
}
+ @Override
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditActionV2 auditAction, String startKey, short maxResultCount) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Listing events for entity id {}, operation {}, starting key{}, maximum result count {}", entityId, auditAction.toString(), startKey, maxResultCount);
}
-
+ AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listSortedEventsV2");
Table table = null;
ResultScanner scanner = null;
@@ -316,6 +320,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
try {
close(scanner);
close(table);
+ RequestContext.get().endMetricRecord(metric);
} catch (AtlasException e) {
throw new AtlasBaseException(e);
}
@@ -323,6 +328,116 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
}
@Override
+ public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})", entityId, auditAction, sortByColumn, offset, limit);
+ }
+
+ AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listEventsV2");
+
+ if (sortByColumn == null) {
+ sortByColumn = EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
+ }
+
+ if (offset < 0) {
+ offset = 0;
+ }
+
+ if (limit < 0) {
+ limit = 100;
+ }
+
+ try (Table table = connection.getTable(tableName)) {
+ /*
+ * HBase Does not support query with sorted results. To support this API inmemory sort has to be performed.
+ * Audit entry can potentially have entire entity dumped into it. Loading entire audit entries for an entity can be
+ * memory intensive. Therefore we load audit entries with limited columns first, perform sort on this light weight list,
+ * then get the relevant section by removing offsets and reducing to limits. With this reduced list we create
+ * MultiRowRangeFilter and then again scan the table to get all the columns from the table this time.
+ */
+ Scan scan = new Scan().setReversed(true)
+ .setCaching(DEFAULT_CACHING)
+ .setSmall(true)
+ .setStopRow(Bytes.toBytes(entityId))
+ .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
+ .addColumn(COLUMN_FAMILY, COLUMN_ACTION)
+ .addColumn(COLUMN_FAMILY, COLUMN_USER);
+
+ if (auditAction != null) {
+ Filter filterAction = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(auditAction.toString())));
+
+ scan.setFilter(filterAction);
+ }
+
+ List<EntityAuditEventV2> events = new ArrayList<>();
+
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ for (Result result = scanner.next(); result != null; result = scanner.next()) {
+ EntityAuditEventV2 event = fromKeyV2(result.getRow());
+
+ event.setUser(getResultString(result, COLUMN_USER));
+ event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
+
+ events.add(event);
+ }
+ }
+
+ EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
+
+ events = events.subList(Math.min(events.size(), offset), Math.min(events.size(), offset + limit));
+
+ if (events.size() > 0) {
+ List<MultiRowRangeFilter.RowRange> ranges = new ArrayList<>();
+
+ events.forEach(e -> {
+ ranges.add(new MultiRowRangeFilter.RowRange(e.getEventKey(), true, e.getEventKey(), true));
+ });
+
+ scan = new Scan().setReversed(true)
+ .setCaching(DEFAULT_CACHING)
+ .setSmall(true)
+ .setStopRow(Bytes.toBytes(entityId))
+ .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
+ .setFilter(new MultiRowRangeFilter(ranges));
+
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ events = new ArrayList<>();
+
+ for (Result result = scanner.next(); result != null; result = scanner.next()) {
+ EntityAuditEventV2 event = fromKeyV2(result.getRow());
+
+ event.setUser(getResultString(result, COLUMN_USER));
+ event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
+ event.setDetails(getResultString(result, COLUMN_DETAIL));
+
+ if (persistEntityDefinition) {
+ String colDef = getResultString(result, COLUMN_DEFINITION);
+
+ if (colDef != null) {
+ event.setEntityDefinition(colDef);
+ }
+ }
+
+ events.add(event);
+ }
+ }
+
+ EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #recored returned {}", entityId, auditAction, sortByColumn, offset, limit, events.size());
+ }
+
+ return events;
+ } catch (IOException e) {
+ throw new AtlasBaseException(e);
+ } finally {
+ RequestContext.get().endMetricRecord(metric);
+ }
+ }
+
+ @Override
public List<Object> listEvents(String entityId, String startKey, short maxResults) throws AtlasBaseException {
List ret = listEventsV2(entityId, null, startKey, maxResults);
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
index 900df02..51efff1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -102,6 +102,22 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
}
@Override
+ public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+ List<EntityAuditEventV2> events = new ArrayList<>();
+ SortedMap<String, EntityAuditEventV2> subMap = auditEventsV2.tailMap(entityId);
+ for (EntityAuditEventV2 event : subMap.values()) {
+ if (event.getEntityId().equals(entityId)) {
+ events.add(event);
+ }
+ }
+ EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
+ events = events.subList(
+ Math.min(events.size(), offset),
+ Math.min(events.size(), offset + limit));
+ return events;
+ }
+
+ @Override
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResults) {
List<EntityAuditEventV2> events = new ArrayList<>();
String myStartKey = startKey;
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
index ef9e259..c62bb7e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
@@ -63,6 +63,11 @@ public class NoopEntityAuditRepository implements EntityAuditRepository {
}
@Override
+ public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+ return Collections.emptyList();
+ }
+
+ @Override
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) {
return Collections.emptyList();
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
index bf4f395..679df3c 100644
--- a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
@@ -158,6 +158,39 @@ public class AuditRepositoryTestBase {
assertEquals(events.size(), 0);
}
+
+ @Test
+ public void testSortListV2() throws Exception {
+ String id1 = "id1" + rand();
+ long ts = System.currentTimeMillis();
+ AtlasEntity entity = new AtlasEntity(rand());
+ List<EntityAuditEventV2> expectedEvents = new ArrayList<>(3);
+
+ expectedEvents.add(new EntityAuditEventV2(id1, ts, "user-a", EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details-1", entity));
+ expectedEvents.add(new EntityAuditEventV2(id1, ts+1, "user-C", EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details-2", entity));
+ expectedEvents.add(new EntityAuditEventV2(id1, ts+2, "User-b", EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE, "details-3", entity));
+ for(EntityAuditEventV2 event : expectedEvents) {
+ eventRepository.putEventsV2(event);
+ }
+
+ List<EntityAuditEventV2> events = eventRepository.listEventsV2(id1, null, "timestamp", false, 0, (short) 2);
+ assertEquals(events.size(), 2);
+ assertEventV2Equals(events.get(0), expectedEvents.get(0));
+ assertEventV2Equals(events.get(1), expectedEvents.get(1));
+
+ events = eventRepository.listEventsV2(id1, null, "user", false, 0, (short) 3);
+ assertEquals(events.size(), 3);
+ assertEventV2Equals(events.get(0), expectedEvents.get(0));
+ assertEventV2Equals(events.get(1), expectedEvents.get(2));
+ assertEventV2Equals(events.get(2), expectedEvents.get(1));
+
+ events = eventRepository.listEventsV2(id1, null, "action", false, 0, (short) 3);
+ assertEquals(events.size(), 3);
+ assertEventV2Equals(events.get(0), expectedEvents.get(2));
+ assertEventV2Equals(events.get(1), expectedEvents.get(1));
+ assertEventV2Equals(events.get(2), expectedEvents.get(0));
+
+ }
protected void assertEventV2Equals(EntityAuditEventV2 actual, EntityAuditEventV2 expected) {
if (expected != null) {
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
index 0d6d0c8..b1e0d2f 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
@@ -801,7 +801,10 @@ public class EntityREST {
@Path("{guid}/audit")
public List<EntityAuditEventV2> getAuditEvents(@PathParam("guid") String guid, @QueryParam("startKey") String startKey,
@QueryParam("auditAction") EntityAuditActionV2 auditAction,
- @QueryParam("count") @DefaultValue("100") short count) throws AtlasBaseException {
+ @QueryParam("count") @DefaultValue("100") short count,
+ @QueryParam("offset") @DefaultValue("-1") int offset,
+ @QueryParam("sortBy") String sortBy,
+ @QueryParam("sortOrder") String sortOrder) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
@@ -824,7 +827,9 @@ public class EntityREST {
List<EntityAuditEventV2> ret = new ArrayList<>();
- if(auditAction != null) {
+ if (sortBy != null || offset > -1) {
+ ret = auditRepository.listEventsV2(guid, auditAction, sortBy, StringUtils.equalsIgnoreCase(sortOrder, "desc"), offset, count);
+ } else if(auditAction != null) {
ret = auditRepository.listEventsV2(guid, auditAction, startKey, count);
} else {
List events = auditRepository.listEvents(guid, startKey, count);