You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/01/26 10:20:55 UTC

incubator-atlas git commit: ATLAS-1436: Metrics caching and UTs (Part 2)

Repository: incubator-atlas
Updated Branches:
  refs/heads/master bf377abbb -> 92d028178


ATLAS-1436: Metrics caching and UTs (Part 2)

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/92d02817
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/92d02817
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/92d02817

Branch: refs/heads/master
Commit: 92d02817846d9431042923182f7feeb1a3a2a7d2
Parents: bf377ab
Author: apoorvnaik <an...@hortonworks.com>
Authored: Wed Jan 25 15:35:28 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Jan 26 02:20:47 2017 -0800

----------------------------------------------------------------------
 distro/src/conf/atlas-application.properties    |   1 +
 .../atlas/model/metrics/AtlasMetrics.java       |   2 +-
 .../apache/atlas/services/MetricsService.java   | 161 +++++++++++++------
 .../atlas/services/MetricsServiceTest.java      | 112 +++++++++++++
 4 files changed, 229 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 7f79ad7..303ce7b 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -207,6 +207,7 @@ atlas.feature.taxonomy.enable=true
 
 ############ Atlas Metric/Stats configs ################
 # Format: atlas.metric.query.<key>.<name>
+atlas.metric.query.cache.ttlInSecs=900
 #atlas.metric.query.general.typeCount=
 #atlas.metric.query.general.typeUnusedCount=
 #atlas.metric.query.general.entityCount=

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
index 602cdb4..edf3cc5 100644
--- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
+++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
@@ -61,7 +61,7 @@ public class AtlasMetrics {
     }
 
     @JsonIgnore
-    public void addData(String groupKey, String key, Integer value) {
+    public void addData(String groupKey, String key, Number value) {
         Map<String, Map<String, Number>> data = this.data;
         if (data == null) {
             data = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/repository/src/main/java/org/apache/atlas/services/MetricsService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
index 855e402..e2cc369 100644
--- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java
+++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
@@ -17,18 +17,21 @@
  */
 package org.apache.atlas.services;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Singleton;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.model.metrics.AtlasMetrics;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.inject.Inject;
 import javax.script.ScriptException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -36,54 +39,128 @@ import java.util.Map;
 public class MetricsService {
     private static final Logger LOG = LoggerFactory.getLogger(MetricsService.class);
 
-    public static final String METRIC_QUERY_PREFIX = "atlas.metric.query.";
+    public static final String METRIC_QUERY_PREFIX       = "atlas.metric.query.";
+    public static final String METRIC_QUERY_CACHE_TTL    = "atlas.metric.query.cache.ttlInSecs";
+    public static final int    DEFAULT_CACHE_TTL_IN_SECS = 900;
 
-    public static final String TYPE = "type";
-    public static final String ENTITY = "entity";
-    public static final String TAG = "tag";
+    public static final String TYPE    = "type";
+    public static final String ENTITY  = "entity";
+    public static final String TAG     = "tag";
     public static final String GENERAL = "general";
 
-    public static final String METRIC_TYPE_COUNT = TYPE + "Count";
+    public static final String METRIC_TYPE_COUNT        = TYPE + "Count";
     public static final String METRIC_TYPE_UNUSED_COUNT = TYPE + "UnusedCount";
-    public static final String METRIC_TYPE_ENTITIES = TYPE + "Entities";
+    public static final String METRIC_TYPE_ENTITIES     = TYPE + "Entities";
 
-    public static final String METRIC_ENTITY_COUNT = ENTITY + "Count";
-    public static final String METRIC_ENTITY_DELETED = ENTITY + "Deleted";
+    public static final String METRIC_ENTITY_COUNT    = ENTITY + "Count";
+    public static final String METRIC_ENTITY_DELETED  = ENTITY + "Deleted";
     public static final String METRIC_TAGGED_ENTITIES = ENTITY + "Tagged";
     public static final String METRIC_TAGS_PER_ENTITY = ENTITY + "Tags";
 
-    public static final String METRIC_TAG_COUNT = TAG + "Count";
+    public static final String METRIC_TAG_COUNT        = TAG + "Count";
     public static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities";
 
-    private static AtlasGraph atlasGraph;
-    private static Configuration configuration;
+    public static final String METRIC_COLLECTION_TIME = "collectionTime";
 
-    public MetricsService() throws AtlasException {
-        atlasGraph = AtlasGraphProvider.getGraphInstance();
-        configuration = ApplicationProperties.get();
+    private static Configuration    configuration = null;
+    private final AtlasGraph        atlasGraph;
+    private final AtlasTypeRegistry atlasTypeRegistry;
+    private final int               cacheTTLInSecs;
+
+    private AtlasMetrics cachedMetrics       = null;
+    private long         cacheExpirationTime = 0;
+
+
+    @Inject
+    public MetricsService(AtlasTypeRegistry typeRegistry) throws AtlasException {
+        this(ApplicationProperties.get(), AtlasGraphProvider.getGraphInstance(), typeRegistry);
+    }
+
+    @VisibleForTesting
+    MetricsService(Configuration configuration, AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
+        MetricsService.configuration = configuration;
+
+        atlasTypeRegistry = typeRegistry;
+        atlasGraph        = graph;
+        cacheTTLInSecs    = configuration != null ? configuration.getInt(METRIC_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS)
+                                                  : DEFAULT_CACHE_TTL_IN_SECS;
     }
 
     @SuppressWarnings("unchecked")
     public AtlasMetrics getMetrics() {
-        AtlasMetrics metrics = new AtlasMetrics();
-        for (MetricQuery metricQuery : MetricQuery.values()) {
-            try {
-                Object result = atlasGraph.executeGremlinScript(metricQuery.query, false);
-                if (result instanceof Number) {
-                    metrics.addData(metricQuery.type, metricQuery.name, ((Number) result).intValue());
-                } else if (result instanceof List) {
-                    for (Map resultMap : (List<Map>) result) {
-                        metrics.addData(metricQuery.type, (String) resultMap.get("key"), ((Number) resultMap.get("value")).intValue());
+        if (!isCacheValid()) {
+            AtlasMetrics metrics = new AtlasMetrics();
+
+            for (MetricQuery metricQuery : MetricQuery.values()) {
+                try {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Executing query: {}", metricQuery);
                     }
-                } else {
-                    LOG.warn("Unhandled return type {} for {}. Ignoring", result.getClass().getSimpleName(), metricQuery);
+
+                    if (metricQuery == MetricQuery.ENTITIES_PER_TYPE) {
+                        Collection<String> entityDefNames = atlasTypeRegistry.getAllEntityDefNames();
+
+                        for (String entityDefName : entityDefNames) {
+                            String formattedQuery = String.format(metricQuery.query, entityDefName);
+
+                            executeGremlinQuery(metrics, metricQuery.type, entityDefName, formattedQuery);
+                        }
+                    } else {
+                        executeGremlinQuery(metrics, metricQuery.type, metricQuery.name, metricQuery.query);
+                    }
+                } catch (ScriptException e) {
+                    LOG.error("Gremlin execution failed for metric {}", metricQuery, e);
                 }
-            } catch (ScriptException e) {
-                LOG.error("Gremlin execution failed for metric {}", metricQuery, e);
             }
+
+            long collectionTime = System.currentTimeMillis();
+
+            metrics.addData(GENERAL, METRIC_COLLECTION_TIME, collectionTime);
+
+            this.cachedMetrics       = metrics;
+            this.cacheExpirationTime = (collectionTime + cacheTTLInSecs * 1000);
         }
 
-        return metrics;
+        return cachedMetrics;
+    }
+
+    private void executeGremlinQuery(AtlasMetrics metrics, String type, String name, String query) throws ScriptException {
+        Object result = atlasGraph.executeGremlinScript(query, false);
+
+        if (result instanceof Number) {
+            metrics.addData(type, name, ((Number) result).intValue());
+        } else if (result instanceof List) {
+            for (Map resultMap : (List<Map>) result) {
+                metrics.addData(type, (String) resultMap.get("key"), ((Number) resultMap.get("value")).intValue());
+            }
+        } else {
+            String returnClassName = result != null ? result.getClass().getSimpleName() : "null";
+
+            LOG.warn("Unhandled return type {} for {}. Ignoring", returnClassName, query);
+        }
+    }
+
+    private boolean isCacheValid() {
+        boolean valid = cachedMetrics != null && System.currentTimeMillis() < cacheExpirationTime;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("cachedMetrics: {}", cachedMetrics != null);
+            LOG.debug("cacheExpirationTime: {}", cacheExpirationTime);
+            LOG.debug("valid: {}", valid);
+        }
+
+        return valid;
+    }
+
+    private static String getQuery(String type, String name, String defaultQuery) {
+        String ret = configuration != null ? configuration.getString(METRIC_QUERY_PREFIX + type + "." + name, defaultQuery)
+                                           : defaultQuery;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("query for {}.{}: {}", type, name, ret);
+        }
+
+        return ret;
     }
 
     /**
@@ -92,35 +169,27 @@ public class MetricsService {
      * The default behavior is to read from the properties and override the statically type query if the configured
      * query is not blank/empty.
      */
-    enum MetricQuery {
+    private enum MetricQuery {
         TYPE_COUNT(GENERAL, METRIC_TYPE_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() != 'TRAIT'}).count()"),
         UNUSED_TYPE_COUNT(GENERAL, METRIC_TYPE_UNUSED_COUNT, "g.V('__type', 'typeSystem').filter({ it.'__type.category'.name() != 'TRAIT' && it.inE.count() == 0}).count()"),
         ENTITY_COUNT(GENERAL, METRIC_ENTITY_COUNT, "g.V().has('__superTypeNames', T.in, ['Referenceable']).count()"),
         TAGS_COUNT(GENERAL, METRIC_TAG_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() == 'TRAIT'}).count()"),
         DELETED_ENTITY_COUNT(GENERAL, METRIC_ENTITY_DELETED, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__status', 'DELETED').count()"),
 
-        ENTITIES_PER_TYPE(ENTITY, METRIC_TYPE_ENTITIES, "g.V().has('__type', 'typeSystem').has('__type.name').filter({it.'__type.category'.name() != 'TRAIT'}).transform{[key: it.'__type.name', value: it.inE.count()]}.dedup().toList()"),
+        ENTITIES_PER_TYPE(ENTITY, METRIC_TYPE_ENTITIES, "g.V().has('__typeName', T.in, ['%s']).count()"),
         TAGGED_ENTITIES(ENTITY, METRIC_TAGGED_ENTITIES, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').count()"),
 
         TAGS_PER_ENTITY(TAG, METRIC_TAGS_PER_ENTITY, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').transform{[ key: it.'Referenceable.qualifiedName', value: it.'__traitNames'.size()]}.dedup().toList()"),
         ;
-        private String type;
-        private String name;
-        private String query;
-
-        private static String getQuery(String type, String name) {
-            String metricQueryKey = METRIC_QUERY_PREFIX + type + "." + name;
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Looking for configured query {}", metricQueryKey);
-            }
-            return configuration.getString(metricQueryKey, "");
-        }
+
+        private final String type;
+        private final String name;
+        private final String query;
 
         MetricQuery(String type, String name, String query) {
-            this.type = type;
-            this.name = name;
-            String configuredQuery = getQuery(type, name);
-            this.query = StringUtils.isNotEmpty(configuredQuery) ? configuredQuery : query;
+            this.type  = type;
+            this.name  = name;
+            this.query = MetricsService.getQuery(type, name, query);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
new file mode 100644
index 0000000..5d2e460
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
@@ -0,0 +1,112 @@
+package org.apache.atlas.services;
+
+import org.apache.atlas.model.metrics.AtlasMetrics;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.script.ScriptException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class MetricsServiceTest {
+    private Configuration mockConfig = mock(Configuration.class);
+    private AtlasTypeRegistry mockTypeRegistry = mock(AtlasTypeRegistry.class);
+    private AtlasGraph mockGraph = mock(AtlasGraph.class);
+    private MetricsService metricsService;
+
+    private List<Map> mockMapList = new ArrayList<>();
+    private Number mockCount = 10;
+
+    @BeforeClass
+    public void init() throws ScriptException {
+        Map<String, Object> aMockMap = new HashMap<>();
+        Map<String, Object> bMockMap = new HashMap<>();
+        Map<String, Object> cMockMap = new HashMap<>();
+        aMockMap.put("key", "a");
+        aMockMap.put("value", 1);
+
+        bMockMap.put("key", "b");
+        bMockMap.put("value", 2);
+
+        cMockMap.put("key", "c");
+        cMockMap.put("value", 3);
+        mockMapList.add(aMockMap);
+        mockMapList.add(bMockMap);
+        mockMapList.add(cMockMap);
+
+        when(mockConfig.getInt(anyString(), anyInt())).thenReturn(5);
+        assertEquals(mockConfig.getInt("test", 1), 5);
+        when(mockTypeRegistry.getAllEntityDefNames()).thenReturn(Arrays.asList("a", "b", "c"));
+        setupMockGraph();
+        metricsService = new MetricsService(mockConfig, mockGraph, mockTypeRegistry);
+    }
+
+    private void setupMockGraph() throws ScriptException {
+        if (mockGraph == null) mockGraph = mock(AtlasGraph.class);
+        when(mockGraph.executeGremlinScript(anyString(), eq(false))).thenAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                if (((String)invocationOnMock.getArguments()[0]).contains("count()")) {
+                    return mockCount;
+                } else {
+                    return mockMapList;
+                }
+            }
+        });
+    }
+
+    @Test
+    public void testGetMetrics() throws InterruptedException, ScriptException {
+        assertNotNull(metricsService);
+        AtlasMetrics metrics = metricsService.getMetrics();
+        assertNotNull(metrics);
+        Number aCount = metrics.getMetric("entity", "a");
+        assertNotNull(aCount);
+        assertEquals(aCount, 10);
+
+        Number bCount = metrics.getMetric("entity", "b");
+        assertNotNull(bCount);
+        assertEquals(bCount, 10);
+
+        Number cCount = metrics.getMetric("entity", "c");
+        assertNotNull(cCount);
+        assertEquals(cCount, 10);
+
+        Number aTags = metrics.getMetric("tag", "a");
+        assertNotNull(aTags);
+        assertEquals(aTags, 1);
+
+        Number bTags = metrics.getMetric("tag", "b");
+        assertNotNull(bTags);
+        assertEquals(bTags, 2);
+
+        Number cTags = metrics.getMetric("tag", "c");
+        assertNotNull(cTags);
+        assertEquals(cTags, 3);
+
+        verify(mockGraph, atLeastOnce()).executeGremlinScript(anyString(), anyBoolean());
+
+        // Subsequent call within the cache timeout window
+        metricsService.getMetrics();
+        verifyZeroInteractions(mockGraph);
+
+        // Now test the cache refresh
+        Thread.sleep(6000);
+        metricsService.getMetrics();
+        verify(mockGraph, atLeastOnce()).executeGremlinScript(anyString(), anyBoolean());
+    }
+}
\ No newline at end of file