You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/01/26 10:20:55 UTC
incubator-atlas git commit: ATLAS-1436: Metrics caching and UTs (Part
2)
Repository: incubator-atlas
Updated Branches:
refs/heads/master bf377abbb -> 92d028178
ATLAS-1436: Metrics caching and UTs (Part 2)
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/92d02817
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/92d02817
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/92d02817
Branch: refs/heads/master
Commit: 92d02817846d9431042923182f7feeb1a3a2a7d2
Parents: bf377ab
Author: apoorvnaik <an...@hortonworks.com>
Authored: Wed Jan 25 15:35:28 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Thu Jan 26 02:20:47 2017 -0800
----------------------------------------------------------------------
distro/src/conf/atlas-application.properties | 1 +
.../atlas/model/metrics/AtlasMetrics.java | 2 +-
.../apache/atlas/services/MetricsService.java | 161 +++++++++++++------
.../atlas/services/MetricsServiceTest.java | 112 +++++++++++++
4 files changed, 229 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 7f79ad7..303ce7b 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -207,6 +207,7 @@ atlas.feature.taxonomy.enable=true
############ Atlas Metric/Stats configs ################
# Format: atlas.metric.query.<key>.<name>
+atlas.metric.query.cache.ttlInSecs=900
#atlas.metric.query.general.typeCount=
#atlas.metric.query.general.typeUnusedCount=
#atlas.metric.query.general.entityCount=
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
index 602cdb4..edf3cc5 100644
--- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
+++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
@@ -61,7 +61,7 @@ public class AtlasMetrics {
}
@JsonIgnore
- public void addData(String groupKey, String key, Integer value) {
+ public void addData(String groupKey, String key, Number value) {
Map<String, Map<String, Number>> data = this.data;
if (data == null) {
data = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/repository/src/main/java/org/apache/atlas/services/MetricsService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
index 855e402..e2cc369 100644
--- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java
+++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
@@ -17,18 +17,21 @@
*/
package org.apache.atlas.services;
+import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.inject.Inject;
import javax.script.ScriptException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -36,54 +39,128 @@ import java.util.Map;
public class MetricsService {
private static final Logger LOG = LoggerFactory.getLogger(MetricsService.class);
- public static final String METRIC_QUERY_PREFIX = "atlas.metric.query.";
+ public static final String METRIC_QUERY_PREFIX = "atlas.metric.query.";
+ public static final String METRIC_QUERY_CACHE_TTL = "atlas.metric.query.cache.ttlInSecs";
+ public static final int DEFAULT_CACHE_TTL_IN_SECS = 900;
- public static final String TYPE = "type";
- public static final String ENTITY = "entity";
- public static final String TAG = "tag";
+ public static final String TYPE = "type";
+ public static final String ENTITY = "entity";
+ public static final String TAG = "tag";
public static final String GENERAL = "general";
- public static final String METRIC_TYPE_COUNT = TYPE + "Count";
+ public static final String METRIC_TYPE_COUNT = TYPE + "Count";
public static final String METRIC_TYPE_UNUSED_COUNT = TYPE + "UnusedCount";
- public static final String METRIC_TYPE_ENTITIES = TYPE + "Entities";
+ public static final String METRIC_TYPE_ENTITIES = TYPE + "Entities";
- public static final String METRIC_ENTITY_COUNT = ENTITY + "Count";
- public static final String METRIC_ENTITY_DELETED = ENTITY + "Deleted";
+ public static final String METRIC_ENTITY_COUNT = ENTITY + "Count";
+ public static final String METRIC_ENTITY_DELETED = ENTITY + "Deleted";
public static final String METRIC_TAGGED_ENTITIES = ENTITY + "Tagged";
public static final String METRIC_TAGS_PER_ENTITY = ENTITY + "Tags";
- public static final String METRIC_TAG_COUNT = TAG + "Count";
+ public static final String METRIC_TAG_COUNT = TAG + "Count";
public static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities";
- private static AtlasGraph atlasGraph;
- private static Configuration configuration;
+ public static final String METRIC_COLLECTION_TIME = "collectionTime";
- public MetricsService() throws AtlasException {
- atlasGraph = AtlasGraphProvider.getGraphInstance();
- configuration = ApplicationProperties.get();
+ private static Configuration configuration = null;
+ private final AtlasGraph atlasGraph;
+ private final AtlasTypeRegistry atlasTypeRegistry;
+ private final int cacheTTLInSecs;
+
+ private AtlasMetrics cachedMetrics = null;
+ private long cacheExpirationTime = 0;
+
+
+ @Inject
+ public MetricsService(AtlasTypeRegistry typeRegistry) throws AtlasException {
+ this(ApplicationProperties.get(), AtlasGraphProvider.getGraphInstance(), typeRegistry);
+ }
+
+ @VisibleForTesting
+ MetricsService(Configuration configuration, AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
+ MetricsService.configuration = configuration;
+
+ atlasTypeRegistry = typeRegistry;
+ atlasGraph = graph;
+ cacheTTLInSecs = configuration != null ? configuration.getInt(METRIC_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS)
+ : DEFAULT_CACHE_TTL_IN_SECS;
}
@SuppressWarnings("unchecked")
public AtlasMetrics getMetrics() {
- AtlasMetrics metrics = new AtlasMetrics();
- for (MetricQuery metricQuery : MetricQuery.values()) {
- try {
- Object result = atlasGraph.executeGremlinScript(metricQuery.query, false);
- if (result instanceof Number) {
- metrics.addData(metricQuery.type, metricQuery.name, ((Number) result).intValue());
- } else if (result instanceof List) {
- for (Map resultMap : (List<Map>) result) {
- metrics.addData(metricQuery.type, (String) resultMap.get("key"), ((Number) resultMap.get("value")).intValue());
+ if (!isCacheValid()) {
+ AtlasMetrics metrics = new AtlasMetrics();
+
+ for (MetricQuery metricQuery : MetricQuery.values()) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing query: {}", metricQuery);
}
- } else {
- LOG.warn("Unhandled return type {} for {}. Ignoring", result.getClass().getSimpleName(), metricQuery);
+
+ if (metricQuery == MetricQuery.ENTITIES_PER_TYPE) {
+ Collection<String> entityDefNames = atlasTypeRegistry.getAllEntityDefNames();
+
+ for (String entityDefName : entityDefNames) {
+ String formattedQuery = String.format(metricQuery.query, entityDefName);
+
+ executeGremlinQuery(metrics, metricQuery.type, entityDefName, formattedQuery);
+ }
+ } else {
+ executeGremlinQuery(metrics, metricQuery.type, metricQuery.name, metricQuery.query);
+ }
+ } catch (ScriptException e) {
+ LOG.error("Gremlin execution failed for metric {}", metricQuery, e);
}
- } catch (ScriptException e) {
- LOG.error("Gremlin execution failed for metric {}", metricQuery, e);
}
+
+ long collectionTime = System.currentTimeMillis();
+
+ metrics.addData(GENERAL, METRIC_COLLECTION_TIME, collectionTime);
+
+ this.cachedMetrics = metrics;
+ this.cacheExpirationTime = (collectionTime + cacheTTLInSecs * 1000);
}
- return metrics;
+ return cachedMetrics;
+ }
+
+ private void executeGremlinQuery(AtlasMetrics metrics, String type, String name, String query) throws ScriptException {
+ Object result = atlasGraph.executeGremlinScript(query, false);
+
+ if (result instanceof Number) {
+ metrics.addData(type, name, ((Number) result).intValue());
+ } else if (result instanceof List) {
+ for (Map resultMap : (List<Map>) result) {
+ metrics.addData(type, (String) resultMap.get("key"), ((Number) resultMap.get("value")).intValue());
+ }
+ } else {
+ String returnClassName = result != null ? result.getClass().getSimpleName() : "null";
+
+ LOG.warn("Unhandled return type {} for {}. Ignoring", returnClassName, query);
+ }
+ }
+
+ private boolean isCacheValid() {
+ boolean valid = cachedMetrics != null && System.currentTimeMillis() < cacheExpirationTime;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("cachedMetrics: {}", cachedMetrics != null);
+ LOG.debug("cacheExpirationTime: {}", cacheExpirationTime);
+ LOG.debug("valid: {}", valid);
+ }
+
+ return valid;
+ }
+
+ private static String getQuery(String type, String name, String defaultQuery) {
+ String ret = configuration != null ? configuration.getString(METRIC_QUERY_PREFIX + type + "." + name, defaultQuery)
+ : defaultQuery;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("query for {}.{}: {}", type, name, ret);
+ }
+
+ return ret;
}
/**
@@ -92,35 +169,27 @@ public class MetricsService {
* The default behavior is to read from the properties and override the statically type query if the configured
* query is not blank/empty.
*/
- enum MetricQuery {
+ private enum MetricQuery {
TYPE_COUNT(GENERAL, METRIC_TYPE_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() != 'TRAIT'}).count()"),
UNUSED_TYPE_COUNT(GENERAL, METRIC_TYPE_UNUSED_COUNT, "g.V('__type', 'typeSystem').filter({ it.'__type.category'.name() != 'TRAIT' && it.inE.count() == 0}).count()"),
ENTITY_COUNT(GENERAL, METRIC_ENTITY_COUNT, "g.V().has('__superTypeNames', T.in, ['Referenceable']).count()"),
TAGS_COUNT(GENERAL, METRIC_TAG_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() == 'TRAIT'}).count()"),
DELETED_ENTITY_COUNT(GENERAL, METRIC_ENTITY_DELETED, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__status', 'DELETED').count()"),
- ENTITIES_PER_TYPE(ENTITY, METRIC_TYPE_ENTITIES, "g.V().has('__type', 'typeSystem').has('__type.name').filter({it.'__type.category'.name() != 'TRAIT'}).transform{[key: it.'__type.name', value: it.inE.count()]}.dedup().toList()"),
+ ENTITIES_PER_TYPE(ENTITY, METRIC_TYPE_ENTITIES, "g.V().has('__typeName', T.in, ['%s']).count()"),
TAGGED_ENTITIES(ENTITY, METRIC_TAGGED_ENTITIES, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').count()"),
TAGS_PER_ENTITY(TAG, METRIC_TAGS_PER_ENTITY, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').transform{[ key: it.'Referenceable.qualifiedName', value: it.'__traitNames'.size()]}.dedup().toList()"),
;
- private String type;
- private String name;
- private String query;
-
- private static String getQuery(String type, String name) {
- String metricQueryKey = METRIC_QUERY_PREFIX + type + "." + name;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Looking for configured query {}", metricQueryKey);
- }
- return configuration.getString(metricQueryKey, "");
- }
+
+ private final String type;
+ private final String name;
+ private final String query;
MetricQuery(String type, String name, String query) {
- this.type = type;
- this.name = name;
- String configuredQuery = getQuery(type, name);
- this.query = StringUtils.isNotEmpty(configuredQuery) ? configuredQuery : query;
+ this.type = type;
+ this.name = name;
+ this.query = MetricsService.getQuery(type, name, query);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
new file mode 100644
index 0000000..5d2e460
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
@@ -0,0 +1,112 @@
+package org.apache.atlas.services;
+
+import org.apache.atlas.model.metrics.AtlasMetrics;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.script.ScriptException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class MetricsServiceTest {
+ private Configuration mockConfig = mock(Configuration.class);
+ private AtlasTypeRegistry mockTypeRegistry = mock(AtlasTypeRegistry.class);
+ private AtlasGraph mockGraph = mock(AtlasGraph.class);
+ private MetricsService metricsService;
+
+ private List<Map> mockMapList = new ArrayList<>();
+ private Number mockCount = 10;
+
+ @BeforeClass
+ public void init() throws ScriptException {
+ Map<String, Object> aMockMap = new HashMap<>();
+ Map<String, Object> bMockMap = new HashMap<>();
+ Map<String, Object> cMockMap = new HashMap<>();
+ aMockMap.put("key", "a");
+ aMockMap.put("value", 1);
+
+ bMockMap.put("key", "b");
+ bMockMap.put("value", 2);
+
+ cMockMap.put("key", "c");
+ cMockMap.put("value", 3);
+ mockMapList.add(aMockMap);
+ mockMapList.add(bMockMap);
+ mockMapList.add(cMockMap);
+
+ when(mockConfig.getInt(anyString(), anyInt())).thenReturn(5);
+ assertEquals(mockConfig.getInt("test", 1), 5);
+ when(mockTypeRegistry.getAllEntityDefNames()).thenReturn(Arrays.asList("a", "b", "c"));
+ setupMockGraph();
+ metricsService = new MetricsService(mockConfig, mockGraph, mockTypeRegistry);
+ }
+
+ private void setupMockGraph() throws ScriptException {
+ if (mockGraph == null) mockGraph = mock(AtlasGraph.class);
+ when(mockGraph.executeGremlinScript(anyString(), eq(false))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (((String)invocationOnMock.getArguments()[0]).contains("count()")) {
+ return mockCount;
+ } else {
+ return mockMapList;
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testGetMetrics() throws InterruptedException, ScriptException {
+ assertNotNull(metricsService);
+ AtlasMetrics metrics = metricsService.getMetrics();
+ assertNotNull(metrics);
+ Number aCount = metrics.getMetric("entity", "a");
+ assertNotNull(aCount);
+ assertEquals(aCount, 10);
+
+ Number bCount = metrics.getMetric("entity", "b");
+ assertNotNull(bCount);
+ assertEquals(bCount, 10);
+
+ Number cCount = metrics.getMetric("entity", "c");
+ assertNotNull(cCount);
+ assertEquals(cCount, 10);
+
+ Number aTags = metrics.getMetric("tag", "a");
+ assertNotNull(aTags);
+ assertEquals(aTags, 1);
+
+ Number bTags = metrics.getMetric("tag", "b");
+ assertNotNull(bTags);
+ assertEquals(bTags, 2);
+
+ Number cTags = metrics.getMetric("tag", "c");
+ assertNotNull(cTags);
+ assertEquals(cTags, 3);
+
+ verify(mockGraph, atLeastOnce()).executeGremlinScript(anyString(), anyBoolean());
+
+ // Subsequent call within the cache timeout window
+ metricsService.getMetrics();
+ verifyZeroInteractions(mockGraph);
+
+ // Now test the cache refresh
+ Thread.sleep(6000);
+ metricsService.getMetrics();
+ verify(mockGraph, atLeastOnce()).executeGremlinScript(anyString(), anyBoolean());
+ }
+}
\ No newline at end of file