You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/01/19 18:02:20 UTC

[atlas] branch master updated: ATLAS-4094: Add support for column sort in Entity Audit API

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new e6ee5b4  ATLAS-4094: Add support for column sort in Entity Audit API
e6ee5b4 is described below

commit e6ee5b43ee48f8d78ff84e222f50f36df4745c81
Author: Deep Singh <de...@gmail.com>
AuthorDate: Tue Jan 19 10:45:29 2021 -0600

    ATLAS-4094: Add support for column sort in Entity Audit API
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
 .../atlas/model/audit/EntityAuditEventV2.java      |  60 +++++++++++
 .../audit/CassandraBasedAuditRepository.java       |   5 +
 .../repository/audit/EntityAuditRepository.java    |  13 +++
 .../audit/HBaseBasedAuditRepository.java           | 117 ++++++++++++++++++++-
 .../audit/InMemoryEntityAuditRepository.java       |  16 +++
 .../audit/NoopEntityAuditRepository.java           |   5 +
 .../repository/audit/AuditRepositoryTestBase.java  |  33 ++++++
 .../java/org/apache/atlas/web/rest/EntityREST.java |   9 +-
 8 files changed, 255 insertions(+), 3 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
index 083acac..3afd27e 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -31,6 +31,8 @@ import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
 import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Objects;
 
 import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
@@ -46,6 +48,10 @@ import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.EN
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)
 public class EntityAuditEventV2 implements Serializable, Clearable {
+    public static final String SORT_COLUMN_USER      = "user";
+    public static final String SORT_COLUMN_ACTION    = "action";
+    public static final String SORT_COLUMN_TIMESTAMP = "timestamp";
+
     public enum EntityAuditType { ENTITY_AUDIT_V1, ENTITY_AUDIT_V2 }
 
     public enum EntityAuditActionV2 {
@@ -282,4 +288,58 @@ public class EntityAuditEventV2 implements Serializable, Clearable {
 
         return ret;
     }
+
+    public static class UserComparator implements Comparator<EntityAuditEventV2> {
+        @Override
+        public int compare(EntityAuditEventV2 me, EntityAuditEventV2 other) {
+            int ret = String.CASE_INSENSITIVE_ORDER.compare(me.getUser(), other.getUser());
+
+            if (ret == 0) {
+                ret = Long.compare(me.getTimestamp(), other.getTimestamp());
+            }
+
+            return ret;
+        }
+    }
+
+    public static class ActionComparator implements Comparator<EntityAuditEventV2> {
+        @Override
+        public int compare(EntityAuditEventV2 me, EntityAuditEventV2 other) {
+            int ret = String.CASE_INSENSITIVE_ORDER.compare(me.getAction().toString(), other.getAction().toString());
+
+            if (ret == 0) {
+                ret = Long.compare(me.getTimestamp(), other.getTimestamp());
+            }
+
+            return ret;
+        }
+    }
+
+    public static class TimestampComparator implements Comparator<EntityAuditEventV2> {
+        @Override
+        public int compare(EntityAuditEventV2 me, EntityAuditEventV2 other) {
+            return Long.compare(me.getTimestamp(), other.getTimestamp());
+        }
+    }
+
+    public static void sortEvents(List<EntityAuditEventV2> events, String sortByColumn, boolean sortOrderDesc) {
+        final Comparator<EntityAuditEventV2> comparator;
+
+        switch (sortByColumn.toLowerCase()) {
+            case SORT_COLUMN_USER:
+                comparator = new EntityAuditEventV2.UserComparator();
+                break;
+
+            case SORT_COLUMN_ACTION:
+                comparator = new EntityAuditEventV2.ActionComparator();
+                break;
+
+            case SORT_COLUMN_TIMESTAMP:
+            default:
+                comparator = new EntityAuditEventV2.TimestampComparator();
+                break;
+        }
+
+        events.sort(sortOrderDesc ? comparator.reversed() : comparator);
+    }
 }
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
index 8a453fd..fb890fc 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java
@@ -190,6 +190,11 @@ public class CassandraBasedAuditRepository extends AbstractStorageBasedAuditRepo
   }
 
   @Override
+  public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+    throw new NotImplementedException();
+  }
+
+  @Override
   public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
     throw new NotImplementedException();
   }
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index 07784d1..bad2a89 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -79,6 +79,19 @@ public interface EntityAuditRepository {
      */
     List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) throws AtlasBaseException;
 
+    /**
+     * List events for the given entity id in sorted order of given column. Returns n results
+     * @param entityId entity id
+     * @param auditAction operation to be used for search at HBase column
+     * @param sortByColumn name of column on which sorting is required
+     * @param sortOrderDesc flag to set sort order descending
+     * @param offset event list is truncated by removing offset number of items, used for pagination
+     * @param limit  Max numbers of events to be returned
+     * @return list of events
+     * @throws AtlasBaseException
+     */
+    List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException;
+
     /***
      * List events for given time range where classifications have been added, deleted or updated.
      * @param fromTimestamp from timestamp
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 9fca744..b061761 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -22,11 +22,13 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.EntityAuditEvent;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
 import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
+import org.apache.atlas.utils.AtlasPerfMetrics;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -226,11 +229,12 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
         }
     }
 
+    @Override
     public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditActionV2 auditAction, String startKey, short maxResultCount) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Listing events for entity id {}, operation {}, starting key{}, maximum result count {}", entityId, auditAction.toString(), startKey, maxResultCount);
         }
-
+        AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listSortedEventsV2");
         Table         table   = null;
         ResultScanner scanner = null;
 
@@ -316,6 +320,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
             try {
                 close(scanner);
                 close(table);
+                RequestContext.get().endMetricRecord(metric);
             } catch (AtlasException e) {
                 throw new AtlasBaseException(e);
             }
@@ -323,6 +328,116 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
     }
 
     @Override
+    public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})", entityId, auditAction, sortByColumn, offset, limit);
+        }
+
+        AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listEventsV2");
+
+        if (sortByColumn == null) {
+            sortByColumn = EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
+        }
+
+        if (offset < 0) {
+            offset = 0;
+        }
+
+        if (limit < 0) {
+            limit = 100;
+        }
+
+        try (Table table = connection.getTable(tableName)) {
+            /*
+             * HBase Does not support query with sorted results. To support this API inmemory sort has to be performed.
+             * Audit entry can potentially have entire entity dumped into it. Loading entire audit entries for an entity can be
+             * memory intensive. Therefore we load audit entries with limited columns first, perform sort on this light weight list,
+             * then get the relevant section by removing offsets and reducing to limits. With this reduced list we create
+             * MultiRowRangeFilter and then again scan the table to get all the columns from the table this time.
+             */
+            Scan scan = new Scan().setReversed(true)
+                                  .setCaching(DEFAULT_CACHING)
+                                  .setSmall(true)
+                                  .setStopRow(Bytes.toBytes(entityId))
+                                  .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
+                                  .addColumn(COLUMN_FAMILY, COLUMN_ACTION)
+                                  .addColumn(COLUMN_FAMILY, COLUMN_USER);
+
+            if (auditAction != null) {
+                Filter filterAction = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(auditAction.toString())));
+
+                scan.setFilter(filterAction);
+            }
+
+            List<EntityAuditEventV2> events = new ArrayList<>();
+
+            try (ResultScanner scanner = table.getScanner(scan)) {
+                for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                    EntityAuditEventV2 event = fromKeyV2(result.getRow());
+
+                    event.setUser(getResultString(result, COLUMN_USER));
+                    event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
+
+                    events.add(event);
+                }
+            }
+
+            EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
+
+            events = events.subList(Math.min(events.size(), offset), Math.min(events.size(), offset + limit));
+
+            if (events.size() > 0) {
+                List<MultiRowRangeFilter.RowRange> ranges = new ArrayList<>();
+
+                events.forEach(e -> {
+                    ranges.add(new MultiRowRangeFilter.RowRange(e.getEventKey(), true, e.getEventKey(), true));
+                });
+
+                scan = new Scan().setReversed(true)
+                                 .setCaching(DEFAULT_CACHING)
+                                 .setSmall(true)
+                                 .setStopRow(Bytes.toBytes(entityId))
+                                 .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
+                                 .setFilter(new MultiRowRangeFilter(ranges));
+
+                try (ResultScanner scanner = table.getScanner(scan)) {
+                    events = new ArrayList<>();
+
+                    for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                        EntityAuditEventV2 event = fromKeyV2(result.getRow());
+
+                        event.setUser(getResultString(result, COLUMN_USER));
+                        event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
+                        event.setDetails(getResultString(result, COLUMN_DETAIL));
+
+                        if (persistEntityDefinition) {
+                            String colDef = getResultString(result, COLUMN_DEFINITION);
+
+                            if (colDef != null) {
+                                event.setEntityDefinition(colDef);
+                            }
+                        }
+
+                        events.add(event);
+                    }
+                }
+
+                EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("<== HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #recored returned {}", entityId, auditAction, sortByColumn, offset, limit, events.size());
+            }
+
+            return events;
+        } catch (IOException e) {
+            throw new AtlasBaseException(e);
+        } finally {
+            RequestContext.get().endMetricRecord(metric);
+        }
+    }
+
+    @Override
     public List<Object> listEvents(String entityId, String startKey, short maxResults) throws AtlasBaseException {
         List ret = listEventsV2(entityId, null, startKey, maxResults);
 
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
index 900df02..51efff1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -102,6 +102,22 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
     }
 
     @Override
+    public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+        List<EntityAuditEventV2> events     = new ArrayList<>();
+        SortedMap<String, EntityAuditEventV2> subMap = auditEventsV2.tailMap(entityId);
+        for (EntityAuditEventV2 event : subMap.values()) {
+            if (event.getEntityId().equals(entityId)) {
+                events.add(event);
+            }
+        }
+        EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
+        events = events.subList(
+                Math.min(events.size(), offset),
+                Math.min(events.size(), offset + limit));
+        return events;
+    }
+
+    @Override
     public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResults) {
         List<EntityAuditEventV2> events     = new ArrayList<>();
         String                   myStartKey = startKey;
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
index ef9e259..c62bb7e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
@@ -63,6 +63,11 @@ public class NoopEntityAuditRepository implements EntityAuditRepository {
     }
 
     @Override
+    public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+        return Collections.emptyList();
+    }
+
+    @Override
     public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) {
         return Collections.emptyList();
     }
diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
index bf4f395..679df3c 100644
--- a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
+++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
@@ -158,6 +158,39 @@ public class AuditRepositoryTestBase {
 
         assertEquals(events.size(), 0);
     }
+    
+    @Test
+    public void testSortListV2() throws Exception {
+        String                 id1            = "id1" + rand();
+        long                   ts             = System.currentTimeMillis();
+        AtlasEntity          entity         = new AtlasEntity(rand());
+        List<EntityAuditEventV2> expectedEvents = new ArrayList<>(3);
+
+        expectedEvents.add(new EntityAuditEventV2(id1, ts, "user-a", EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details-1", entity));
+        expectedEvents.add(new EntityAuditEventV2(id1, ts+1, "user-C", EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details-2", entity));
+        expectedEvents.add(new EntityAuditEventV2(id1, ts+2, "User-b", EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE, "details-3", entity));
+        for(EntityAuditEventV2 event : expectedEvents) {
+            eventRepository.putEventsV2(event);
+        }
+
+        List<EntityAuditEventV2> events = eventRepository.listEventsV2(id1, null, "timestamp", false, 0, (short) 2);
+        assertEquals(events.size(), 2);
+        assertEventV2Equals(events.get(0), expectedEvents.get(0));
+        assertEventV2Equals(events.get(1), expectedEvents.get(1));
+
+        events = eventRepository.listEventsV2(id1, null, "user", false, 0, (short) 3);
+        assertEquals(events.size(), 3);
+        assertEventV2Equals(events.get(0), expectedEvents.get(0));
+        assertEventV2Equals(events.get(1), expectedEvents.get(2));
+        assertEventV2Equals(events.get(2), expectedEvents.get(1));
+
+        events = eventRepository.listEventsV2(id1, null, "action", false, 0, (short) 3);
+        assertEquals(events.size(), 3);
+        assertEventV2Equals(events.get(0), expectedEvents.get(2));
+        assertEventV2Equals(events.get(1), expectedEvents.get(1));
+        assertEventV2Equals(events.get(2), expectedEvents.get(0));
+
+    }
 
     protected void assertEventV2Equals(EntityAuditEventV2 actual, EntityAuditEventV2 expected) {
         if (expected != null) {
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
index 0d6d0c8..b1e0d2f 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
@@ -801,7 +801,10 @@ public class EntityREST {
     @Path("{guid}/audit")
     public List<EntityAuditEventV2> getAuditEvents(@PathParam("guid") String guid, @QueryParam("startKey") String startKey,
                                                    @QueryParam("auditAction") EntityAuditActionV2 auditAction,
-                                                   @QueryParam("count") @DefaultValue("100") short count) throws AtlasBaseException {
+                                                   @QueryParam("count") @DefaultValue("100") short count,
+                                                   @QueryParam("offset") @DefaultValue("-1") int offset,
+                                                   @QueryParam("sortBy") String sortBy,
+                                                   @QueryParam("sortOrder") String sortOrder) throws AtlasBaseException {
         AtlasPerfTracer perf = null;
 
         try {
@@ -824,7 +827,9 @@ public class EntityREST {
 
             List<EntityAuditEventV2> ret = new ArrayList<>();
 
-            if(auditAction != null) {
+            if (sortBy != null || offset > -1) {
+                ret = auditRepository.listEventsV2(guid, auditAction, sortBy, StringUtils.equalsIgnoreCase(sortOrder, "desc"), offset, count);
+            } else if(auditAction != null) {
                 ret = auditRepository.listEventsV2(guid, auditAction, startKey, count);
             } else {
                 List events = auditRepository.listEvents(guid, startKey, count);