You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2019/04/10 16:58:04 UTC
[atlas] branch branch-2.0 updated: ATLAS-3071: updated
stats/metrics module to collect notification metrics
This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 512ed81 ATLAS-3071: updated stats/metrics module to collect notification metrics
512ed81 is described below
commit 512ed8181f9cbfc530dc9f9a38502e5daafd7a22
Author: Madhan Neethiraj <ma...@apache.org>
AuthorDate: Thu Mar 21 16:35:12 2019 -0700
ATLAS-3071: updated stats/metrics module to collect notification metrics
Co-authored-by: "lma <lm...@cloudera.com>"
(cherry picked from commit 1a5009ea7ec589e5809aae5ead8fac2467394f25)
---
.../org/apache/atlas/model/AtlasStatistics.java | 79 ------
.../apache/atlas/model/metrics/AtlasMetrics.java | 57 ++++-
.../org/apache/atlas/services/MetricsService.java | 116 ++++-----
.../org/apache/atlas/util/AtlasMetricsCounter.java | 268 ++++++++++++++++++++
.../org/apache/atlas/util/AtlasMetricsUtil.java | 271 ++++++++++++++++++++
.../java/org/apache/atlas/util/StatisticsUtil.java | 274 ---------------------
.../apache/atlas/services/MetricsServiceTest.java | 115 ++++++++-
.../notification/NotificationHookConsumer.java | 107 ++++----
.../web/service/ActiveInstanceElectorService.java | 41 ++-
.../NotificationHookConsumerKafkaTest.java | 10 +-
.../notification/NotificationHookConsumerTest.java | 32 +--
.../service/ActiveInstanceElectorServiceTest.java | 32 +--
12 files changed, 866 insertions(+), 536 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java b/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java
deleted file mode 100644
index 0ecbd9a..0000000
--- a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.model;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
-import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
-
-/**
- * Atlas statistics
- */
-@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
-@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class AtlasStatistics {
- public static final String STAT_SERVER_START_TS = "Server:upFrom";
- public static final String STAT_SERVER_ACTIVE_TS = "Server:activateFrom";
- public static final String STAT_SERVER_UP_SINCE = "Server:upTime";
- public static final String STAT_START_OFFSET = "Notification:ATLAS_HOOK:offsetStart";
- public static final String STAT_CURRENT_OFFSET = "Notification:ATLAS_HOOK:offsetCurrent";
- public static final String STAT_SOLR_STATUS = "ConnectionStatus:Solr";
- public static final String STAT_HBASE_STATUS = "ConnectionStatus:HBase";
- public static final String STAT_LAST_MESSAGE_PROCESSED_TIME_TS = "Notification:ATLAS_HOOK:messageLastProcessedAt";
- public static final String STAT_AVG_MESSAGE_PROCESSING_TIME = "Notification:ATLAS_HOOK:messageAvgProcessingDuration";
- public static final String STAT_MESSAGES_CONSUMED = "Notification:ATLAS_HOOK:messagesConsumed";
-
- private Map<String, Object> data = new HashMap<>();
-
- public Map<String, Object> getData() {
- return data;
- }
-
- public void setData(Map<String, Object> data) {
- this.data = data;
- }
-
- @Override
- public String toString() {
- return "AtlasStatistics{" +
- "data=" + data +
- '}';
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(data);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- AtlasStatistics other = (AtlasStatistics) o;
-
- return Objects.equals(this.data, other.data);
- }
-}
diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
index c3304cc..6f7c914 100644
--- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
+++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
@@ -36,6 +36,51 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown=true)
public class AtlasMetrics {
+ public static final String PREFIX_CONNECTION_STATUS = "ConnectionStatus:";
+ public static final String PREFIX_NOTIFICATION = "Notification:";
+ public static final String PREFIX_SERVER = "Server:";
+
+ public static final String STAT_CONNECTION_STATUS_BACKEND_STORE = PREFIX_CONNECTION_STATUS + "backendStore";
+ public static final String STAT_CONNECTION_STATUS_INDEX_STORE = PREFIX_CONNECTION_STATUS + "indexStore";
+ public static final String STAT_NOTIFY_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDay";
+ public static final String STAT_NOTIFY_AVG_TIME_CURR_DAY = PREFIX_NOTIFICATION + "currentDayAvgTime";
+ public static final String STAT_NOTIFY_CREATES_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayEntityCreates";
+ public static final String STAT_NOTIFY_UPDATES_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayEntityUpdates";
+ public static final String STAT_NOTIFY_DELETES_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayEntityDeletes";
+ public static final String STAT_NOTIFY_FAILED_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayFailed";
+ public static final String STAT_NOTIFY_START_TIME_CURR_DAY = PREFIX_NOTIFICATION + "currentDayStartTime";
+ public static final String STAT_NOTIFY_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHour";
+ public static final String STAT_NOTIFY_AVG_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourAvgTime";
+ public static final String STAT_NOTIFY_CREATES_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourEntityCreates";
+ public static final String STAT_NOTIFY_UPDATES_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourEntityUpdates";
+ public static final String STAT_NOTIFY_DELETES_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourEntityDeletes";
+ public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourFailed";
+ public static final String STAT_NOTIFY_START_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourStartTime";
+ public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME = PREFIX_NOTIFICATION + "lastMessageProcessedTime";
+ public static final String STAT_NOTIFY_START_OFFSET = PREFIX_NOTIFICATION + "offsetStart";
+ public static final String STAT_NOTIFY_CURRENT_OFFSET = PREFIX_NOTIFICATION + "offsetCurrent";
+ public static final String STAT_NOTIFY_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDay";
+ public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY = PREFIX_NOTIFICATION + "previousDayAvgTime";
+ public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityCreates";
+ public static final String STAT_NOTIFY_UPDATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityUpdates";
+ public static final String STAT_NOTIFY_DELETES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityDeletes";
+ public static final String STAT_NOTIFY_FAILED_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayFailed";
+ public static final String STAT_NOTIFY_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHour";
+ public static final String STAT_NOTIFY_AVG_TIME_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourAvgTime";
+ public static final String STAT_NOTIFY_CREATES_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourEntityCreates";
+ public static final String STAT_NOTIFY_UPDATES_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourEntityUpdates";
+ public static final String STAT_NOTIFY_DELETES_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourEntityDeletes";
+ public static final String STAT_NOTIFY_FAILED_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourFailed";
+ public static final String STAT_NOTIFY_COUNT_TOTAL = PREFIX_NOTIFICATION + "total";
+ public static final String STAT_NOTIFY_AVG_TIME_TOTAL = PREFIX_NOTIFICATION + "totalAvgTime";
+ public static final String STAT_NOTIFY_CREATES_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalCreates";
+ public static final String STAT_NOTIFY_UPDATES_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalUpdates";
+ public static final String STAT_NOTIFY_DELETES_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalDeletes";
+ public static final String STAT_NOTIFY_FAILED_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalFailed";
+ public static final String STAT_SERVER_ACTIVE_TIMESTAMP = PREFIX_SERVER + "activeTimeStamp";
+ public static final String STAT_SERVER_START_TIMESTAMP = PREFIX_SERVER + "startTimeStamp";
+ public static final String STAT_SERVER_UP_TIME = PREFIX_SERVER + "upTime";
+
private Map<String, Map<String, Object>> data;
public AtlasMetrics() {
@@ -63,30 +108,38 @@ public class AtlasMetrics {
@JsonIgnore
public void addMetric(String groupKey, String key, Object value) {
Map<String, Map<String, Object>> data = this.data;
+
if (data == null) {
data = new HashMap<>();
+
+ this.data = data;
}
+
Map<String, Object> metricMap = data.computeIfAbsent(groupKey, k -> new HashMap<>());
+
metricMap.put(key, value);
- setData(data);
}
@JsonIgnore
public Number getNumericMetric(String groupKey, String key) {
Object metric = getMetric(groupKey, key);
+
return metric instanceof Number ? (Number) metric : null;
}
@JsonIgnore
public Object getMetric(String groupKey, String key) {
+ Object ret = null;
Map<String, Map<String, Object>> data = this.data;
- Object ret = null;
+
if (data != null) {
Map<String, Object> metricMap = data.get(groupKey);
+
if (metricMap != null && !metricMap.isEmpty()) {
ret = metricMap.get(key);
}
}
+
return ret;
}
}
diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
index d9ea12a..8fb68e9 100644
--- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java
+++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
@@ -18,15 +18,13 @@
package org.apache.atlas.services;
import org.apache.atlas.annotation.AtlasService;
-import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +51,8 @@ public class MetricsService {
public static final String GENERAL = "general";
// Query names
+ protected static final String METRIC_COLLECTION_TIME = "collectionTime";
+ protected static final String METRIC_STATS = "stats";
protected static final String METRIC_TYPE_COUNT = TYPE + "Count";
protected static final String METRIC_TYPE_UNUSED_COUNT = TYPE + "UnusedCount";
protected static final String METRIC_ENTITY_COUNT = ENTITY + "Count";
@@ -61,114 +61,90 @@ public class MetricsService {
protected static final String METRIC_TAG_COUNT = TAG + "Count";
protected static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities";
- public static final String METRIC_COLLECTION_TIME = "collectionTime";
-
private final AtlasGraph atlasGraph;
private final AtlasTypeRegistry typeRegistry;
- private final StatisticsUtil statisticsUtil;
+ private final AtlasMetricsUtil metricsUtil;
private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix();
@Inject
- public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) {
+ public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) {
this.atlasGraph = graph;
this.typeRegistry = typeRegistry;
- this.statisticsUtil = statisticsUtil;
+ this.metricsUtil = metricsUtil;
}
@SuppressWarnings("unchecked")
public AtlasMetrics getMetrics() {
- AtlasMetrics metrics = new AtlasMetrics();
-
- metrics.addMetric(GENERAL, METRIC_TYPE_COUNT, getAllTypesCount());
- metrics.addMetric(GENERAL, METRIC_TAG_COUNT, getAllTagsCount());
-
- Map<String, Long> activeCountMap = new HashMap<>();
- Map<String, Long> deletedCountMap = new HashMap<>();
-
- // metrics for classifications
+ Collection<String> entityDefNames = typeRegistry.getAllEntityDefNames();
Collection<String> classificationDefNames = typeRegistry.getAllClassificationDefNames();
-
- if (classificationDefNames != null) {
- for (String classificationDefName : classificationDefNames) {
- activeCountMap.put(classificationDefName, getTypeCount(classificationDefName, ACTIVE));
- }
- }
-
- // metrics for entities
- Collection<String> entityDefNames = typeRegistry.getAllEntityDefNames();
+ Map<String, Long> activeEntityCount = new HashMap<>();
+ Map<String, Long> deletedEntityCount = new HashMap<>();
+ Map<String, Long> taggedEntityCount = new HashMap<>();
+ long unusedTypeCount = 0;
+ long totalEntities = 0;
if (entityDefNames != null) {
for (String entityDefName : entityDefNames) {
- activeCountMap.put(entityDefName, getTypeCount(entityDefName, ACTIVE));
- deletedCountMap.put(entityDefName, getTypeCount(entityDefName, DELETED));
+ long activeCount = getTypeCount(entityDefName, ACTIVE);
+ long deletedCount = getTypeCount(entityDefName, DELETED);
+
+ if (activeCount > 0) {
+ activeEntityCount.put(entityDefName, activeCount);
+ totalEntities += activeCount;
+ }
+
+ if (deletedCount > 0) {
+ deletedEntityCount.put(entityDefName, deletedCount);
+ totalEntities += deletedCount;
+ }
+
+ if (activeCount == 0 && deletedCount == 0) {
+ unusedTypeCount++;
+ }
}
}
- Map<String, Long> activeEntityCount = new HashMap<>();
- Map<String, Long> deletedEntityCount = new HashMap<>();
- long unusedTypeCount = 0;
- long totalEntities = 0;
-
- for (String entityDefName : typeRegistry.getAllEntityDefNames()) {
- Long activeCount = activeCountMap.get(entityDefName);
- Long deletedCount = deletedCountMap.get(entityDefName);
-
- if (activeCount > 0) {
- activeEntityCount.put(entityDefName, activeCount);
- totalEntities += activeCount.longValue();
- }
-
- if (deletedCount > 0) {
- deletedEntityCount.put(entityDefName, deletedCount);
- totalEntities += deletedCount.longValue();
- }
+ if (classificationDefNames != null) {
+ for (String classificationDefName : classificationDefNames) {
+ long count = getTypeCount(classificationDefName, ACTIVE);
- if (activeCount == 0 && deletedCount == 0) {
- unusedTypeCount++;
+ if (count > 0) {
+ taggedEntityCount.put(classificationDefName, count);
+ }
}
}
+ AtlasMetrics metrics = new AtlasMetrics();
+
+ metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, System.currentTimeMillis());
+ metrics.addMetric(GENERAL, METRIC_STATS, metricsUtil.getStats()); //add atlas server stats
+ metrics.addMetric(GENERAL, METRIC_TYPE_COUNT, getAllTypesCount());
+ metrics.addMetric(GENERAL, METRIC_TAG_COUNT, getAllTagsCount());
metrics.addMetric(GENERAL, METRIC_TYPE_UNUSED_COUNT, unusedTypeCount);
metrics.addMetric(GENERAL, METRIC_ENTITY_COUNT, totalEntities);
+
metrics.addMetric(ENTITY, METRIC_ENTITY_ACTIVE, activeEntityCount);
metrics.addMetric(ENTITY, METRIC_ENTITY_DELETED, deletedEntityCount);
- Map<String, Long> taggedEntityCount = new HashMap<>();
-
- for (String classificationName : typeRegistry.getAllClassificationDefNames()) {
- Long count = activeCountMap.get(classificationName);
-
- if (count > 0) {
- taggedEntityCount.put(classificationName, count);
- }
- }
-
metrics.addMetric(TAG, METRIC_ENTITIES_PER_TAG, taggedEntityCount);
- // Miscellaneous metrics
- long collectionTime = System.currentTimeMillis();
-
- metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, collectionTime);
-
- //add atlas server stats
- Map<String, Object> statistics = statisticsUtil.getAtlasStatistics();
- metrics.addMetric(GENERAL, "stats", statistics);
-
return metrics;
}
- private Long getTypeCount(String typeName, Status status) {
+ private long getTypeCount(String typeName, Status status) {
+ Long ret = null;
String indexQuery = indexSearchPrefix + "\"" + ENTITY_TYPE_PROPERTY_KEY + "\" : (%s)" + AND_STR +
indexSearchPrefix + "\"" + STATE_PROPERTY_KEY + "\" : (%s)";
indexQuery = String.format(indexQuery, typeName, status.name());
try {
- return atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals();
+ ret = atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals();
}catch (Exception e){
LOG.error("Failed fetching using indexQuery: " + e.getMessage());
}
- return 0l;
+
+ return ret == null ? 0L : ret;
}
private int getAllTypesCount() {
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
new file mode 100644
index 0000000..acf9e34
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.util;
+
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+
+import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
+
+public class AtlasMetricsCounter {
+ public enum Period { ALL, CURR_DAY, CURR_HOUR, PREV_HOUR, PREV_DAY };
+
+ private final String name;
+ private final Stats stats;
+ private Clock clock;
+ private Instant lastIncrTime;
+ private Instant dayStartTime;
+ private Instant dayEndTime;
+ private Instant hourStartTime;
+ private Instant hourEndTime;
+
+ public AtlasMetricsCounter(String name) {
+ this(name, Clock.systemUTC());
+ }
+
+ public AtlasMetricsCounter(String name, Clock clock) {
+ this.name = name;
+ this.stats = new Stats();
+
+ init(clock);
+ }
+
+ public String getName() { return name; }
+
+ public Instant getLastIncrTime() { return lastIncrTime; }
+
+ public void incr() {
+ incrByWithMeasure(1, 0);
+ }
+
+ public void incrBy(long count) {
+ incrByWithMeasure(count, 0);
+ }
+
+ public void incrWithMeasure(long measure) {
+ incrByWithMeasure(1, measure);
+ }
+
+ public void incrByWithMeasure(long count, long measure) {
+ Instant instant = clock.instant();
+
+ stats.addCount(ALL, count);
+ stats.addMeasure(ALL, measure);
+
+ if (instant.isAfter(dayStartTime)) { // ignore times earlier than start of current day
+ lastIncrTime = instant;
+
+ updateForTime(instant);
+
+ stats.addCount(CURR_DAY, count);
+ stats.addMeasure(CURR_DAY, measure);
+
+ if (instant.isAfter(hourStartTime)) { // ignore times earlier than start of current hour
+ stats.addCount(CURR_HOUR, count);
+ stats.addMeasure(CURR_HOUR, measure);
+ }
+ }
+ }
+
+ public Stats report() {
+ updateForTime(clock.instant());
+
+ return new Stats(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli());
+ }
+
+ // visible only for testing
+ void init(Clock clock) {
+ this.clock = clock;
+ this.lastIncrTime = Instant.ofEpochSecond(0);
+ this.dayStartTime = Instant.ofEpochSecond(0);
+ this.dayEndTime = Instant.ofEpochSecond(0);
+ this.hourStartTime = Instant.ofEpochSecond(0);
+ this.hourEndTime = Instant.ofEpochSecond(0);
+
+ updateForTime(clock.instant());
+ }
+
+ protected void updateForTime(Instant instant) {
+ if (instant.isAfter(dayEndTime)) {
+ rolloverDay(instant);
+ rolloverHour(instant);
+ } else if (instant.isAfter(hourEndTime)) {
+ rolloverHour(instant);
+ }
+ }
+
+ protected void rolloverDay(Instant instant) {
+ Instant dayStartTime = getDayStartTime(instant);
+
+ if (dayStartTime.equals(dayEndTime)) {
+ stats.copy(CURR_DAY, PREV_DAY);
+ } else {
+ stats.reset(PREV_DAY);
+ }
+
+ stats.reset(CURR_DAY);
+
+ this.dayStartTime = dayStartTime;
+ this.dayEndTime = getNextDayStartTime(instant);
+ }
+
+ protected void rolloverHour(Instant instant) {
+ Instant hourStartTime = getHourStartTime(instant);
+
+ if (hourStartTime.equals(hourEndTime)) {
+ stats.copy(CURR_HOUR, PREV_HOUR);
+ } else {
+ stats.reset(PREV_HOUR);
+ }
+
+ stats.reset(CURR_HOUR);
+
+ this.hourStartTime = hourStartTime;
+ this.hourEndTime = getNextHourStartTime(instant);
+ }
+
+ public static LocalDateTime getLocalDateTime(Instant instant) {
+ return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+ }
+
+ public static Instant getHourStartTime(Instant instant) {
+ LocalDateTime time = getLocalDateTime(instant);
+
+ return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).plusHours(time.getHour()).toInstant(ZoneOffset.UTC);
+ }
+
+ public static Instant getNextHourStartTime(Instant instant) {
+ LocalDateTime time = getLocalDateTime(instant);
+
+ return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).plusHours(time.getHour() + 1).toInstant(ZoneOffset.UTC);
+ }
+
+ public static Instant getDayStartTime(Instant instant) {
+ LocalDateTime time = getLocalDateTime(instant);
+
+ return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).toInstant(ZoneOffset.UTC);
+ }
+
+ public static Instant getNextDayStartTime(Instant instant) {
+ LocalDateTime time = getLocalDateTime(instant);
+
+ return LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.MIN).toInstant(ZoneOffset.UTC);
+ }
+
+
+ public static class Stats {
+ private static final int NUM_PERIOD = Period.values().length;
+
+ private final long dayStartTimeMs;
+ private final long hourStartTimeMs;
+ private final long[] count = new long[NUM_PERIOD];
+ private final long[] measureSum = new long[NUM_PERIOD];
+ private final long[] measureMin = new long[NUM_PERIOD];
+ private final long[] measureMax = new long[NUM_PERIOD];
+
+
+ public Stats() {
+ dayStartTimeMs = 0;
+ hourStartTimeMs = 0;
+
+ for (Period period : Period.values()) {
+ reset(period);
+ }
+ }
+
+ public Stats(Stats other, long dayStartTimeMs, long hourStartTimeMs) {
+ this.dayStartTimeMs = dayStartTimeMs;
+ this.hourStartTimeMs = hourStartTimeMs;
+
+ copy(other.count, this.count);
+ copy(other.measureSum, this.measureSum);
+ copy(other.measureMin, this.measureMin);
+ copy(other.measureMax, this.measureMax);
+ }
+
+ public long getDayStartTimeMs() { return dayStartTimeMs; }
+
+ public long getHourStartTimeMs() { return hourStartTimeMs; }
+
+ public long getCount(Period period) { return count[period.ordinal()]; }
+
+ public long getMeasureSum(Period period) { return measureSum[period.ordinal()]; }
+
+ public long getMeasureMin(Period period) { return measureMin[period.ordinal()]; }
+
+ public long getMeasureMax(Period period) { return measureMax[period.ordinal()]; }
+
+ public long getMeasureAvg(Period period) {
+ int idx = period.ordinal();
+ long c = count[idx];
+
+ return c != 0 ? (measureSum[idx] / c) : 0;
+ }
+
+ public void addCount(Period period, long num) {
+ count[period.ordinal()] += num;
+ }
+
+ public void addMeasure(Period period, long measure) {
+ int idx = period.ordinal();
+
+ measureSum[idx] += measure;
+
+ if (measureMin[idx] > measure) {
+ measureMin[idx] = measure;
+ }
+
+ if (measureMax[idx] < measure) {
+ measureMax[idx] = measure;
+ }
+ }
+
+ private void copy(Period src, Period dest) {
+ int srcIdx = src.ordinal();
+ int destIdx = dest.ordinal();
+
+ count[destIdx] = count[srcIdx];
+ measureSum[destIdx] = measureSum[srcIdx];
+ measureMin[destIdx] = measureMin[srcIdx];
+ measureMax[destIdx] = measureMax[srcIdx];
+ }
+
+ private void reset(Period period) {
+ int idx = period.ordinal();
+
+ count[idx] = 0;
+ measureSum[idx] = 0;
+ measureMin[idx] = Long.MAX_VALUE;
+ measureMax[idx] = Long.MIN_VALUE;
+ }
+
+ private void copy(long[] src, long[] dest) {
+ for (int i = 0; i < dest.length; i++) {
+ dest[i] = src[i];
+ }
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
new file mode 100644
index 0000000..c41e6bd
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.util;
+
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.util.AtlasMetricsCounter.Stats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.time.Clock;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.apache.atlas.model.metrics.AtlasMetrics.*;
+import static org.apache.atlas.repository.Constants.TYPE_NAME_INTERNAL;
+import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
+import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
+
+@Component
+public class AtlasMetricsUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasMetricsUtil.class);
+
+ private static final long SEC_MS = 1000;
+ private static final long MIN_MS = 60 * SEC_MS;
+ private static final long HOUR_MS = 60 * MIN_MS;
+ private static final long DAY_MS = 24 * HOUR_MS;
+ private static final String STATUS_CONNECTED = "connected";
+ private static final String STATUS_NOT_CONNECTED = "not-connected";
+
+ private final AtlasGraph graph;
+ private long serverStartTime = 0;
+ private long serverActiveTime = 0;
+ private long msgOffsetStart = -1;
+ private long msgOffsetCurrent = 0;
+ private final AtlasMetricsCounter messagesProcessed = new AtlasMetricsCounter("messagesProcessed");
+ private final AtlasMetricsCounter messagesFailed = new AtlasMetricsCounter("messagesFailed");
+ private final AtlasMetricsCounter entityCreates = new AtlasMetricsCounter("entityCreates");
+ private final AtlasMetricsCounter entityUpdates = new AtlasMetricsCounter("entityUpdates");
+ private final AtlasMetricsCounter entityDeletes = new AtlasMetricsCounter("entityDeletes");
+
+ @Inject
+ public AtlasMetricsUtil(AtlasGraph graph) {
+ this.graph = graph;
+ }
+
+ // visible only for testing
+ public void init(Clock clock) {
+ messagesProcessed.init(clock);
+ messagesFailed.init(clock);
+ entityCreates.init(clock);
+ entityUpdates.init(clock);
+ entityDeletes.init(clock);
+ }
+
+ public void onServerStart() {
+ serverStartTime = System.currentTimeMillis();
+ }
+
+ public void onServerActivation() {
+ serverActiveTime = System.currentTimeMillis();
+ }
+
+ public void onNotificationProcessingComplete(long msgOffset, NotificationStat stats) {
+ messagesProcessed.incrWithMeasure(stats.timeTakenMs);
+ entityCreates.incrBy(stats.entityCreates);
+ entityUpdates.incrBy(stats.entityUpdates);
+ entityDeletes.incrBy(stats.entityDeletes);
+
+ if (stats.isFailedMsg) {
+ messagesFailed.incr();
+ }
+
+ if (msgOffsetStart == -1) {
+ msgOffsetStart = msgOffset;
+ }
+
+ msgOffsetCurrent = ++msgOffset;
+ }
+
+ public Map<String, Object> getStats() {
+ Map<String, Object> ret = new HashMap<>();
+
+ Stats messagesProcessed = this.messagesProcessed.report();
+ Stats messagesFailed = this.messagesFailed.report();
+ Stats entityCreates = this.entityCreates.report();
+ Stats entityUpdates = this.entityUpdates.report();
+ Stats entityDeletes = this.entityDeletes.report();
+
+ ret.put(STAT_SERVER_START_TIMESTAMP, serverStartTime);
+ ret.put(STAT_SERVER_ACTIVE_TIMESTAMP, serverActiveTime);
+ ret.put(STAT_SERVER_UP_TIME, millisToTimeDiff(System.currentTimeMillis() - serverStartTime));
+ ret.put(STAT_CONNECTION_STATUS_BACKEND_STORE, getHBaseStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
+ ret.put(STAT_CONNECTION_STATUS_INDEX_STORE, getSolrStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED);
+ ret.put(STAT_NOTIFY_START_OFFSET, msgOffsetStart);
+ ret.put(STAT_NOTIFY_CURRENT_OFFSET, msgOffsetCurrent);
+ ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME, this.messagesProcessed.getLastIncrTime().toEpochMilli());
+
+ ret.put(STAT_NOTIFY_COUNT_TOTAL, messagesProcessed.getCount(ALL));
+ ret.put(STAT_NOTIFY_AVG_TIME_TOTAL, messagesProcessed.getMeasureAvg(ALL));
+ ret.put(STAT_NOTIFY_FAILED_COUNT_TOTAL, messagesFailed.getCount(ALL));
+ ret.put(STAT_NOTIFY_CREATES_COUNT_TOTAL, entityCreates.getCount(ALL));
+ ret.put(STAT_NOTIFY_UPDATES_COUNT_TOTAL, entityUpdates.getCount(ALL));
+ ret.put(STAT_NOTIFY_DELETES_COUNT_TOTAL, entityDeletes.getCount(ALL));
+
+ ret.put(STAT_NOTIFY_START_TIME_CURR_DAY, messagesProcessed.getDayStartTimeMs());
+ ret.put(STAT_NOTIFY_COUNT_CURR_DAY, messagesProcessed.getCount(CURR_DAY));
+ ret.put(STAT_NOTIFY_AVG_TIME_CURR_DAY, messagesProcessed.getMeasureAvg(CURR_DAY));
+ ret.put(STAT_NOTIFY_FAILED_COUNT_CURR_DAY, messagesFailed.getCount(CURR_DAY));
+ ret.put(STAT_NOTIFY_CREATES_COUNT_CURR_DAY, entityCreates.getCount(CURR_DAY));
+ ret.put(STAT_NOTIFY_UPDATES_COUNT_CURR_DAY, entityUpdates.getCount(CURR_DAY));
+ ret.put(STAT_NOTIFY_DELETES_COUNT_CURR_DAY, entityDeletes.getCount(CURR_DAY));
+
+ ret.put(STAT_NOTIFY_START_TIME_CURR_HOUR, messagesProcessed.getHourStartTimeMs());
+ ret.put(STAT_NOTIFY_COUNT_CURR_HOUR, messagesProcessed.getCount(CURR_HOUR));
+ ret.put(STAT_NOTIFY_AVG_TIME_CURR_HOUR, messagesProcessed.getMeasureAvg(CURR_HOUR));
+ ret.put(STAT_NOTIFY_FAILED_COUNT_CURR_HOUR, messagesFailed.getCount(CURR_HOUR));
+ ret.put(STAT_NOTIFY_CREATES_COUNT_CURR_HOUR, entityCreates.getCount(CURR_HOUR));
+ ret.put(STAT_NOTIFY_UPDATES_COUNT_CURR_HOUR, entityUpdates.getCount(CURR_HOUR));
+ ret.put(STAT_NOTIFY_DELETES_COUNT_CURR_HOUR, entityDeletes.getCount(CURR_HOUR));
+
+ ret.put(STAT_NOTIFY_COUNT_PREV_HOUR, messagesProcessed.getCount(PREV_HOUR));
+ ret.put(STAT_NOTIFY_AVG_TIME_PREV_HOUR, messagesProcessed.getMeasureAvg(PREV_HOUR));
+ ret.put(STAT_NOTIFY_FAILED_COUNT_PREV_HOUR, messagesFailed.getCount(PREV_HOUR));
+ ret.put(STAT_NOTIFY_CREATES_COUNT_PREV_HOUR, entityCreates.getCount(PREV_HOUR));
+ ret.put(STAT_NOTIFY_UPDATES_COUNT_PREV_HOUR, entityUpdates.getCount(PREV_HOUR));
+ ret.put(STAT_NOTIFY_DELETES_COUNT_PREV_HOUR, entityDeletes.getCount(PREV_HOUR));
+
+ ret.put(STAT_NOTIFY_COUNT_PREV_DAY, messagesProcessed.getCount(PREV_DAY));
+ ret.put(STAT_NOTIFY_AVG_TIME_PREV_DAY, messagesProcessed.getMeasureAvg(PREV_DAY));
+ ret.put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY, messagesFailed.getCount(PREV_DAY));
+ ret.put(STAT_NOTIFY_CREATES_COUNT_PREV_DAY, entityCreates.getCount(PREV_DAY));
+ ret.put(STAT_NOTIFY_UPDATES_COUNT_PREV_DAY, entityUpdates.getCount(PREV_DAY));
+ ret.put(STAT_NOTIFY_DELETES_COUNT_PREV_DAY, entityDeletes.getCount(PREV_DAY));
+
+ return ret;
+ }
+
+ private boolean getHBaseStatus(){
+ try {
+ runWithTimeout(new Runnable() {
+ @Override
+ public void run() {
+ graph.query().has(TYPE_NAME_PROPERTY_KEY, TYPE_NAME_INTERNAL).vertices(1);
+ }
+ }, 10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean getSolrStatus(){
+ final String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + Constants.TYPE_NAME_PROPERTY_KEY + "\":(" + TYPE_NAME_INTERNAL + ")";
+
+ try {
+ runWithTimeout(new Runnable() {
+ @Override
+ public void run() {
+ graph.indexQuery(Constants.VERTEX_INDEX, query).vertices(0, 1);
+ }
+ }, 10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ return false;
+ }
+
+ return true;
+ }
+
+ private void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception {
+ runWithTimeout(new Callable<Object>() {
+ @Override
+ public Object call() {
+ runnable.run();
+ return null;
+ }
+ }, timeout, timeUnit);
+ }
+
+ private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ final Future<T> future = executor.submit(callable);
+
+ executor.shutdown();
+
+ try {
+ return future.get(timeout, timeUnit);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+
+ throw e;
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+
+ if (t instanceof Error) {
+ throw (Error) t;
+ } else if (t instanceof Exception) {
+ throw (Exception) t;
+ } else {
+ throw new IllegalStateException(t);
+ }
+ }
+ }
+
+ private String millisToTimeDiff(long msDiff) {
+ StringBuilder sb = new StringBuilder();
+
+ long diffSeconds = msDiff / SEC_MS % 60;
+ long diffMinutes = msDiff / MIN_MS % 60;
+ long diffHours = msDiff / HOUR_MS % 24;
+ long diffDays = msDiff / DAY_MS;
+
+ if (diffDays > 0) sb.append(diffDays).append(" day ");
+ if (diffHours > 0) sb.append(diffHours).append(" hour ");
+ if (diffMinutes > 0) sb.append(diffMinutes).append(" min ");
+ if (diffSeconds > 0) sb.append(diffSeconds).append(" sec");
+
+ return sb.toString();
+ }
+
+ public static class NotificationStat {
+ public boolean isFailedMsg = false;
+ public long timeTakenMs = 0;
+ public int entityCreates = 0;
+ public int entityUpdates = 0;
+ public int entityDeletes = 0;
+
+ public NotificationStat() { }
+
+ public NotificationStat(boolean isFailedMsg, long timeTakenMs) {
+ this.isFailedMsg = isFailedMsg;
+ this.timeTakenMs = timeTakenMs;
+ }
+
+ public void updateStats(EntityMutationResponse response) {
+ entityCreates += getSize(response.getCreatedEntities());
+ entityUpdates += getSize(response.getUpdatedEntities());
+ entityUpdates += getSize(response.getPartialUpdatedEntities());
+ entityDeletes += getSize(response.getDeletedEntities());
+ }
+
+ private int getSize(Collection collection) {
+ return collection != null ? collection.size() : 0;
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java b/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java
deleted file mode 100644
index efb804b..0000000
--- a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.util;
-
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.AtlasStatistics;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.springframework.stereotype.Component;
-
-import javax.inject.Inject;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Locale;
-import java.util.concurrent.*;
-
-import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_START_TS;
-import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_ACTIVE_TS;
-import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_UP_SINCE;
-import static org.apache.atlas.model.AtlasStatistics.STAT_START_OFFSET;
-import static org.apache.atlas.model.AtlasStatistics.STAT_CURRENT_OFFSET;
-import static org.apache.atlas.model.AtlasStatistics.STAT_SOLR_STATUS;
-import static org.apache.atlas.model.AtlasStatistics.STAT_HBASE_STATUS;
-import static org.apache.atlas.model.AtlasStatistics.STAT_LAST_MESSAGE_PROCESSED_TIME_TS;
-import static org.apache.atlas.model.AtlasStatistics.STAT_AVG_MESSAGE_PROCESSING_TIME;
-import static org.apache.atlas.model.AtlasStatistics.STAT_MESSAGES_CONSUMED;
-
-@Component
-public class StatisticsUtil {
- private static final Logger LOG = LoggerFactory.getLogger(StatisticsUtil.class);
-
- private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("d MMM, yyyy : hh:mm aaa z");
-
- private static final long DAY = 1000 * 60 * 60 * 24;
- private static final long HOUR = 1000 * 60 * 60;
- private static final long MIN = 1000 * 60;
- private static final long SEC = 1000;
-
- private final AtlasGraph graph;
- private final String STATUS_CONNECTED = "connected";
- private final String STATUS_NOT_CONNECTED = "not-connected";
- private final AtlasStatistics atlasStatistics;
-
- private long countMsgProcessed = 0;
- private long totalMsgProcessingTimeMs = 0;
- private Locale locale = new Locale("en", "US");
- private NumberFormat numberFormat;
-
- @Inject
- public StatisticsUtil(AtlasGraph graph) {
- this.graph = graph;
- this.atlasStatistics = new AtlasStatistics();
- numberFormat = NumberFormat.getInstance(locale);
- }
-
- public Map<String, Object> getAtlasStatistics() {
- Map<String, Object> statisticsMap = new HashMap<>();
- statisticsMap.putAll(atlasStatistics.getData());
-
- statisticsMap.put(STAT_HBASE_STATUS, getHBaseStatus());
- statisticsMap.put(STAT_SOLR_STATUS, getSolrStatus());
- statisticsMap.put(STAT_SERVER_UP_SINCE, getUpSinceTime());
- if(countMsgProcessed > 0) {
- statisticsMap.put(STAT_MESSAGES_CONSUMED, countMsgProcessed);
- }
- formatStatistics(statisticsMap);
-
- return statisticsMap;
- }
-
- public void setKafkaOffsets(long value) {
- if (Long.parseLong(getStat(STAT_START_OFFSET).toString()) == -1) {
- addStat(STAT_START_OFFSET, value);
- }
- addStat(STAT_CURRENT_OFFSET, ++value);
- }
-
- public void setAvgMsgProcessingTime(long value) {
- countMsgProcessed++;
- totalMsgProcessingTimeMs += value;
- value = totalMsgProcessingTimeMs / countMsgProcessed;
-
- addStat(STAT_AVG_MESSAGE_PROCESSING_TIME, value);
- }
-
- public void setLastMsgProcessedTime() {
- addStat(STAT_LAST_MESSAGE_PROCESSED_TIME_TS, System.currentTimeMillis());
- }
-
- public void setServerStartTime() {
- addStat(STAT_SERVER_START_TS, System.currentTimeMillis());
- }
-
- public void setServerActiveTime() {
- addStat(STAT_SERVER_ACTIVE_TS, System.currentTimeMillis());
- }
-
-
- private void addStat(String key, Object value) {
- Map<String, Object> data = atlasStatistics.getData();
- if (data == null) {
- data = new HashMap<>();
- }
- data.put(key, value);
- atlasStatistics.setData(data);
- }
-
- private Object getStat(String key) {
- Map<String, Object> data = atlasStatistics.getData();
- Object ret = data.get(key);
- if (ret == null) {
- return -1;
- }
- return ret;
- }
-
- private void formatStatistics(Map<String, Object> statisticsMap) {
- for (Map.Entry<String, Object> stat : statisticsMap.entrySet()) {
- switch (stat.getKey()) {
- case STAT_SERVER_UP_SINCE:
- statisticsMap.put(stat.getKey(), millisToTimeDiff(Long.parseLong(stat.getValue().toString())));
- break;
-
- case STAT_LAST_MESSAGE_PROCESSED_TIME_TS:
- statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
- break;
-
- case STAT_SERVER_START_TS:
- case STAT_SERVER_ACTIVE_TS:
- statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
- break;
-
- case STAT_AVG_MESSAGE_PROCESSING_TIME:
- statisticsMap.put(stat.getKey(), formatNumber(Long.parseLong(stat.getValue().toString())) + " milliseconds");
- break;
-
- case STAT_HBASE_STATUS:
- case STAT_SOLR_STATUS:
- String curState = ((boolean) stat.getValue()) ? STATUS_CONNECTED : STATUS_NOT_CONNECTED;
- statisticsMap.put(stat.getKey(), curState);
- break;
-
- case STAT_MESSAGES_CONSUMED:
- case STAT_START_OFFSET:
- case STAT_CURRENT_OFFSET:
- statisticsMap.put(stat.getKey(), formatNumber(Long.parseLong(stat.getValue().toString())));
- break;
-
- default:
- statisticsMap.put(stat.getKey(), stat.getValue());
- }
- }
- }
-
- private boolean getHBaseStatus() {
-
- String query = "g.V().next()";
- try {
- runWithTimeout(new Runnable() {
- @Override
- public void run() {
- try {
- graph.executeGremlinScript(query, false);
- } catch (AtlasBaseException e) {
- LOG.error(e.getMessage());
- }
- }
- }, 10, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error(e.getMessage());
- return false;
- }
-
- return true;
- }
-
- private boolean getSolrStatus() {
- String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + "__type.name\"" + " : (*)";
- try {
- runWithTimeout(new Runnable() {
- @Override
- public void run() {
- graph.indexQuery(Constants.VERTEX_INDEX, query).vertexTotals();
- }
- }, 10, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error(e.getMessage());
- return false;
- }
- return true;
- }
-
- private void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception {
- runWithTimeout(new Callable<Object>() {
- @Override
- public Object call() {
- runnable.run();
- return null;
- }
- }, timeout, timeUnit);
- }
-
- private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- final Future<T> future = executor.submit(callable);
- executor.shutdown();
- try {
- return future.get(timeout, timeUnit);
- } catch (TimeoutException e) {
- future.cancel(true);
- throw e;
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof Error) {
- throw (Error) t;
- } else if (t instanceof Exception) {
- throw (Exception) t;
- } else {
- throw new IllegalStateException(t);
- }
- }
- }
-
- private long getUpSinceTime() {
- long upTS = Long.parseLong(getStat(STAT_SERVER_START_TS).toString());
- return System.currentTimeMillis() - upTS;
- }
-
- private String millisToTimeDiff(long msDiff) {
- StringBuilder sb = new StringBuilder();
-
- long diffSeconds = msDiff / SEC % 60;
- long diffMinutes = msDiff / MIN % 60;
- long diffHours = msDiff / HOUR % 24;
- long diffDays = msDiff / DAY;
-
- if (diffDays > 0) sb.append(diffDays).append(" day ");
- if (diffHours > 0) sb.append(diffHours).append(" hour ");
- if (diffMinutes > 0) sb.append(diffMinutes).append(" min ");
- if (diffSeconds > 0) sb.append(diffSeconds).append(" sec");
-
- return sb.toString();
- }
-
- private String millisToTimeStamp(long ms) {
- return simpleDateFormat.format(ms);
- }
-
- private String formatNumber(long value) {
- return numberFormat.format(value);
- }
-
-}
diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
index 78e5803..64698c2 100644
--- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
@@ -28,6 +28,8 @@ import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.AtlasMetricsCounter;
+import org.apache.atlas.util.AtlasMetricsUtil;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -37,10 +39,15 @@ import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
+import static org.apache.atlas.model.metrics.AtlasMetrics.*;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.services.MetricsService.ENTITY;
@@ -53,11 +60,11 @@ import static org.apache.atlas.services.MetricsService.METRIC_TAG_COUNT;
import static org.apache.atlas.services.MetricsService.METRIC_TYPE_COUNT;
import static org.apache.atlas.services.MetricsService.METRIC_TYPE_UNUSED_COUNT;
import static org.apache.atlas.services.MetricsService.TAG;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.*;
@Guice(modules = TestModules.TestOnlyModule.class)
public class MetricsServiceTest {
+
public static final String IMPORT_FILE = "metrics-entities-data.zip";
@Inject
@@ -72,6 +79,14 @@ public class MetricsServiceTest {
@Inject
private MetricsService metricsService;
+ @Inject
+ private AtlasMetricsUtil metricsUtil;
+
+ TestClock clock = new TestClock(Clock.systemUTC(), ZoneOffset.UTC);
+
+ long msgOffset = 0;
+
+
private final Map<String, Long> activeEntityMetricsExpected = new HashMap<String, Long>() {{
put("hive_storagedesc", 5L);
put("__ExportImportAuditEntry", 1L);
@@ -95,6 +110,17 @@ public class MetricsServiceTest {
put("PII", 1L);
}};
+ private final Map<String, Object> metricExpected = new HashMap<String, Object>() {{
+ put(STAT_NOTIFY_COUNT_CURR_HOUR, 11L);
+ put(STAT_NOTIFY_FAILED_COUNT_CURR_HOUR, 1L);
+ put(STAT_NOTIFY_COUNT_PREV_HOUR, 11L);
+ put(STAT_NOTIFY_FAILED_COUNT_PREV_HOUR, 1L);
+ put(STAT_NOTIFY_COUNT_CURR_DAY, 33L);
+ put(STAT_NOTIFY_FAILED_COUNT_CURR_DAY, 3L);
+ put(STAT_NOTIFY_COUNT_PREV_DAY, 11L);
+ put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY, 1L);
+ }};
+
@BeforeClass
public void setup() {
RequestContext.clear();
@@ -148,6 +174,24 @@ public class MetricsServiceTest {
assertEquals(deletedEntityMetricsActual, deletedEntityMetricsExpected);
}
+ @Test
+ public void testNotificationMetrics() {
+ Instant now = Clock.systemUTC().instant();
+ Instant dayStartTime = AtlasMetricsCounter.getDayStartTime(now);
+ Instant dayEndTime = AtlasMetricsCounter.getNextDayStartTime(now);
+ Instant hourStartTime = dayEndTime.minusSeconds(60 * 60);
+
+ prepareNotificationData(dayStartTime, hourStartTime);
+
+ clock.setInstant(dayEndTime.minusSeconds(1));
+
+ Map<String, Object> notificationMetricMap = metricsUtil.getStats();
+
+ clock.setInstant(null);
+
+ verifyNotificationMetric(metricExpected, notificationMetricMap);
+ }
+
private void loadModelFilesAndImportTestData() {
try {
@@ -165,8 +209,75 @@ public class MetricsServiceTest {
}
}
+ private void prepareNotificationData(Instant dayStartTime, Instant hourStartTime) {
+ Instant prevDayStartTime = AtlasMetricsCounter.getDayStartTime(dayStartTime.minusSeconds(1));
+
+ msgOffset = 0;
+
+ clock.setInstant(prevDayStartTime);
+ metricsUtil.init(clock);
+ clock.setInstant(null);
+
+ processMessage(prevDayStartTime.plusSeconds(3)); // yesterday
+ processMessage(dayStartTime.plusSeconds(3)); // today
+ processMessage(hourStartTime.minusSeconds(3)); // past hour
+ processMessage(hourStartTime.plusSeconds(3)); // this hour
+ }
+
+ private void processMessage(Instant instant) {
+ clock.setInstant(instant);
+
+ metricsUtil.onNotificationProcessingComplete(++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1));
+
+ for (int i = 0; i < 10; i++) {
+ metricsUtil.onNotificationProcessingComplete(msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1));
+ }
+
+ clock.setInstant(null);
+ }
+
+ private void verifyNotificationMetric(Map<String, Object> metricExpected, Map<String, Object> notificationMetrics) {
+ assertNotNull(notificationMetrics);
+ assertNotEquals(notificationMetrics.size(), 0);
+ assertTrue(notificationMetrics.size() >= metricExpected.size());
+
+ for (Map.Entry<String, Object> entry : metricExpected.entrySet()) {
+ assertEquals(notificationMetrics.get(entry.getKey()), entry.getValue(), entry.getKey());
+ }
+ }
+
public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException {
FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName);
return new ZipSource(fs);
}
+
+ private static class TestClock extends Clock {
+ private final Clock baseClock;
+ private final ZoneId zone;
+ private Instant instant = null;
+
+ public TestClock(Clock baseClock, ZoneId zone) {
+ this.baseClock = baseClock;
+ this.zone = zone;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return zone;
+ }
+
+ @Override
+ public TestClock withZone(ZoneId zone) {
+ return new TestClock(baseClock, zone);
+ }
+
+ @Override
+ public Instant instant() {
+ return instant != null ? instant : baseClock.instant();
+ }
+
+ public void setInstant(Instant instant) {
+ this.instant = instant;
+ }
+ }
}
\ No newline at end of file
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 8430fd4..ce2d76f 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -47,7 +47,8 @@ import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.atlas.utils.LruCache;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil.NotificationStat;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
@@ -140,7 +141,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final ServiceState serviceState;
private final AtlasInstanceConverter instanceConverter;
private final AtlasTypeRegistry typeRegistry;
- private final StatisticsUtil statisticsUtil;
+ private final AtlasMetricsUtil metricsUtil;
private final int maxRetries;
private final int failedMsgCacheSize;
private final int minWaitDuration;
@@ -156,10 +157,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled;
-
- private NotificationInterface notificationInterface;
- private ExecutorService executors;
- private Configuration applicationProperties;
+ private final NotificationInterface notificationInterface;
+ private final Configuration applicationProperties;
+ private ExecutorService executors;
@VisibleForTesting
final int consumerRetryInterval;
@@ -170,14 +170,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
ServiceState serviceState, AtlasInstanceConverter instanceConverter,
- AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) throws AtlasException {
+ AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) throws AtlasException {
this.notificationInterface = notificationInterface;
this.atlasEntityStore = atlasEntityStore;
this.serviceState = serviceState;
this.instanceConverter = instanceConverter;
this.typeRegistry = typeRegistry;
this.applicationProperties = ApplicationProperties.get();
- this.statisticsUtil = statisticsUtil;
+ this.metricsUtil = metricsUtil;
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
@@ -475,12 +475,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@VisibleForTesting
void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException {
- AtlasPerfTracer perf = null;
- HookNotification message = kafkaMsg.getMessage();
- String messageUser = message.getUser();
- long startTime = System.currentTimeMillis();
- boolean isFailedMsg = false;
- AuditLog auditLog = null;
+ AtlasPerfTracer perf = null;
+ HookNotification message = kafkaMsg.getMessage();
+ String messageUser = message.getUser();
+ long startTime = System.currentTimeMillis();
+ NotificationStat stats = new NotificationStat();
+ AuditLog auditLog = null;
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
@@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
}
- createOrUpdate(entities, false, context);
+ createOrUpdate(entities, false, stats, context);
}
break;
@@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
// There should only be one root entity
entities.getEntities().get(0).setGuid(guid);
- createOrUpdate(entities, true, context);
+ createOrUpdate(entities, true, stats, context);
}
break;
@@ -562,7 +562,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
- atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
+ EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue()));
+
+ stats.updateStats(response);
} catch (ClassCastException cle) {
LOG.error("Failed to delete entity {}", deleteRequest);
}
@@ -579,7 +581,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
- createOrUpdate(entities, false, context);
+ createOrUpdate(entities, false, stats, context);
}
break;
@@ -593,7 +595,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
}
- createOrUpdate(entities, false, context);
+ createOrUpdate(entities, false, stats, context);
}
break;
@@ -608,7 +610,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
- atlasEntityStore.updateEntity(entityId, entity, true);
+ EntityMutationResponse response = atlasEntityStore.updateEntity(entityId, entity, true);
+
+ stats.updateStats(response);
}
break;
@@ -622,7 +626,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
}
- createOrUpdate(entities, false, context);
+ createOrUpdate(entities, false, stats, context);
}
break;
@@ -640,7 +644,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
- atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
+ EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
+
+ stats.updateStats(response);
}
} catch (ClassCastException cle) {
LOG.error("Failed to do delete entities {}", entities);
@@ -661,7 +667,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
LOG.warn("Max retries exceeded for message {}", strMessage, e);
- isFailedMsg = true;
+ stats.isFailedMsg = true;
failedMessages.add(strMessage);
@@ -689,33 +695,34 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} finally {
AtlasPerfTracer.log(perf);
- long msgProcessingTime = System.currentTimeMillis() - startTime;
+ stats.timeTakenMs = System.currentTimeMillis() - startTime;
+
+ metricsUtil.onNotificationProcessingComplete(kafkaMsg.getOffset(), stats);
- if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) {
+ if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) {
String strMessage = AbstractNotification.getMessageJson(message);
- LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset());
- LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+ LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
+ LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
}
if (auditLog != null) {
- auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK);
- auditLog.setTimeTaken(msgProcessingTime);
+ auditLog.setHttpStatus(stats.isFailedMsg ? SC_BAD_REQUEST : SC_OK);
+ auditLog.setTimeTaken(stats.timeTakenMs);
AuditFilter.audit(auditLog);
}
- statisticsUtil.setAvgMsgProcessingTime(msgProcessingTime);
}
}
- private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, PreprocessorContext context) throws AtlasBaseException {
+ private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, NotificationStat stats, PreprocessorContext context) throws AtlasBaseException {
List<AtlasEntity> entitiesList = entities.getEntities();
AtlasEntityStream entityStream = new AtlasEntityStream(entities);
if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) {
EntityMutationResponse response = atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
- recordProcessedEntities(response, context);
+ recordProcessedEntities(response, stats, context);
} else {
for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) {
int toIndex = fromIdx + commitBatchSize;
@@ -733,7 +740,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
- recordProcessedEntities(response, context);
+ recordProcessedEntities(response, stats, context);
RequestContext.get().resetEntityGuidUpdates();
@@ -770,8 +777,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumer.commit(partition, kafkaMessage.getOffset() + 1);
commitSucceessStatus = true;
- statisticsUtil.setKafkaOffsets(kafkaMessage.getOffset());
- statisticsUtil.setLastMsgProcessedTime();
} finally {
failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
}
@@ -1021,24 +1026,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
return ret;
}
- private void recordProcessedEntities(EntityMutationResponse mutationResponse, PreprocessorContext context) {
- if (mutationResponse != null && context != null) {
- if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
- context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
+ private void recordProcessedEntities(EntityMutationResponse mutationResponse, NotificationStat stats, PreprocessorContext context) {
+ if (mutationResponse != null) {
+ if (stats != null) {
+ stats.updateStats(mutationResponse);
}
- if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) {
- for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) {
- if (entity != null && entity.getGuid() != null) {
- context.getCreatedEntities().add(entity.getGuid());
+ if (context != null) {
+ if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
+ context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
+ }
+
+ if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) {
+ for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) {
+ if (entity != null && entity.getGuid() != null) {
+ context.getCreatedEntities().add(entity.getGuid());
+ }
}
}
- }
- if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) {
- for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) {
- if (entity != null && entity.getGuid() != null) {
- context.getDeletedEntities().add(entity.getGuid());
+ if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) {
+ for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) {
+ if (entity != null && entity.getGuid() != null) {
+ context.getDeletedEntities().add(entity.getGuid());
+ }
}
}
}
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
index 7887afb..10081ac 100644
--- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
@@ -23,7 +23,7 @@ import org.apache.atlas.ha.AtlasServerIdSelector;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
@@ -55,18 +55,17 @@ import java.util.Set;
@Component
@Order(1)
public class ActiveInstanceElectorService implements Service, LeaderLatchListener {
-
private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceElectorService.class);
- private final Configuration configuration;
- private final ServiceState serviceState;
- private final ActiveInstanceState activeInstanceState;
- private final StatisticsUtil statisticsUtil;
- private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
- private List<ActiveStateChangeHandler> activeStateChangeHandlers;
- private CuratorFactory curatorFactory;
- private LeaderLatch leaderLatch;
- private String serverId;
+ private final Configuration configuration;
+ private final ServiceState serviceState;
+ private final ActiveInstanceState activeInstanceState;
+ private final AtlasMetricsUtil metricsUtil;
+ private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
+ private List<ActiveStateChangeHandler> activeStateChangeHandlers;
+ private CuratorFactory curatorFactory;
+ private LeaderLatch leaderLatch;
+ private String serverId;
/**
* Create a new instance of {@link ActiveInstanceElectorService}
@@ -78,14 +77,14 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
ActiveInstanceElectorService(Configuration configuration,
Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders,
CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState,
- ServiceState serviceState, StatisticsUtil statisticsUtil) {
- this.configuration = configuration;
+ ServiceState serviceState, AtlasMetricsUtil metricsUtil) {
+ this.configuration = configuration;
this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders;
- this.activeStateChangeHandlers = new ArrayList<>();
- this.curatorFactory = curatorFactory;
- this.activeInstanceState = activeInstanceState;
- this.serviceState = serviceState;
- this.statisticsUtil = statisticsUtil;
+ this.activeStateChangeHandlers = new ArrayList<>();
+ this.curatorFactory = curatorFactory;
+ this.activeInstanceState = activeInstanceState;
+ this.serviceState = serviceState;
+ this.metricsUtil = metricsUtil;
}
/**
@@ -96,9 +95,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
*/
@Override
public void start() throws AtlasException {
- statisticsUtil.setServerStartTime();
+ metricsUtil.onServerStart();
if (!HAConfiguration.isHAEnabled(configuration)) {
- statisticsUtil.setServerActiveTime();
+ metricsUtil.onServerActivation();
LOG.info("HA is not enabled, no need to start leader election service");
return;
}
@@ -156,7 +155,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
}
activeInstanceState.update(serverId);
serviceState.setActive();
- statisticsUtil.setServerActiveTime();
+ metricsUtil.onServerActivation();
} catch (Exception e) {
LOG.error("Got exception while activating", e);
notLeader();
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index c7ba699..fb3ff26 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -25,7 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -82,7 +82,7 @@ public class NotificationHookConsumerKafkaTest {
private AtlasTypeRegistry typeRegistry;
@Mock
- private StatisticsUtil statisticsUtil;
+ private AtlasMetricsUtil metricsUtil;
@BeforeTest
public void setup() throws AtlasException, InterruptedException, AtlasBaseException {
@@ -108,7 +108,7 @@ public class NotificationHookConsumerKafkaTest {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
@@ -127,7 +127,7 @@ public class NotificationHookConsumerKafkaTest {
public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
@@ -163,7 +163,7 @@ public class NotificationHookConsumerKafkaTest {
assertNotNull (consumer);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index de316b6..3e35511 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -26,7 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
import org.apache.atlas.notification.NotificationInterface.NotificationType;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -77,7 +77,7 @@ public class NotificationHookConsumerTest {
private AtlasTypeRegistry typeRegistry;
@Mock
- private StatisticsUtil statisticsUtil;
+ private AtlasMetricsUtil metricsUtil;
@BeforeMethod
public void setup() throws AtlasBaseException {
@@ -96,7 +96,7 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerCanProceedIfServerIsReady() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
@@ -109,7 +109,7 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
@@ -126,7 +126,7 @@ public class NotificationHookConsumerTest {
@Test
public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = mock(EntityCreateRequest.class);
@@ -143,7 +143,7 @@ public class NotificationHookConsumerTest {
@Test
public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
@@ -157,7 +157,7 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
@@ -177,9 +177,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
-
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
notificationHookConsumer.startInternal(configuration, executorService);
verify(notificationInterface).createConsumers(NotificationType.HOOK, 1);
@@ -197,8 +195,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
notificationHookConsumer.startInternal(configuration, executorService);
@@ -217,7 +214,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsActive();
@@ -237,8 +234,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-
- final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
doAnswer(new Answer() {
@Override
@@ -269,8 +265,7 @@ public class NotificationHookConsumerTest {
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-
- final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
@@ -335,7 +330,6 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
-
- return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
+ return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
}
}
diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
index 0fe3eba..3ce0c4b 100644
--- a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
@@ -23,7 +23,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.mockito.InOrder;
@@ -53,7 +53,7 @@ public class ActiveInstanceElectorServiceTest {
private ServiceState serviceState;
@Mock
- private StatisticsUtil statisticsUtil;
+ private AtlasMetricsUtil metricsUtil;
@BeforeMethod
public void setup() {
@@ -75,7 +75,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
verify(leaderLatch).start();
@@ -96,7 +96,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
verify(leaderLatch).addListener(activeInstanceElectorService);
@@ -108,7 +108,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
verifyZeroInteractions(curatorFactory);
@@ -129,7 +129,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.stop();
@@ -151,7 +151,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.stop();
@@ -165,7 +165,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.stop();
verifyZeroInteractions(curatorFactory);
@@ -193,7 +193,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
@@ -216,7 +216,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
@@ -249,7 +249,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
@@ -275,7 +275,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
@@ -310,7 +310,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
- activeInstanceState, serviceState, statisticsUtil);
+ activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.notLeader();
@@ -322,7 +322,7 @@ public class ActiveInstanceElectorServiceTest {
public void testActiveStateSetOnBecomingLeader() {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
- curatorFactory, activeInstanceState, serviceState, statisticsUtil);
+ curatorFactory, activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.isLeader();
@@ -335,7 +335,7 @@ public class ActiveInstanceElectorServiceTest {
public void testPassiveStateSetOnLoosingLeadership() {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
- curatorFactory, activeInstanceState, serviceState, statisticsUtil);
+ curatorFactory, activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.notLeader();
@@ -362,7 +362,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
- curatorFactory, activeInstanceState, serviceState, statisticsUtil);
+ curatorFactory, activeInstanceState, serviceState, metricsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();