You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/04/03 12:31:37 UTC
[atlas] branch master updated: ATLAS-3017: Add Atlas server
statistics rest endpoint
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 984445e ATLAS-3017: Add Atlas server statistics rest endpoint
984445e is described below
commit 984445e9111c743f3f255fc4653fc02555a8f095
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Wed Apr 3 05:31:16 2019 -0700
ATLAS-3017: Add Atlas server statistics rest endpoint
Signed-off-by: Sarath Subramanian <ss...@hortonworks.com>
---
.../js/templates/common/Statistics_tmpl.html | 49 +++-
dashboardv2/public/js/views/common/Statistics.js | 7 +-
.../org/apache/atlas/model/AtlasStatistics.java | 78 +++++++
.../org/apache/atlas/services/MetricsService.java | 18 +-
.../java/org/apache/atlas/util/StatisticsUtil.java | 254 +++++++++++++++++++++
.../notification/NotificationHookConsumer.java | 12 +-
.../web/service/ActiveInstanceElectorService.java | 9 +-
.../NotificationHookConsumerKafkaTest.java | 10 +-
.../notification/NotificationHookConsumerTest.java | 26 ++-
.../service/ActiveInstanceElectorServiceTest.java | 32 +--
10 files changed, 446 insertions(+), 49 deletions(-)
diff --git a/dashboardv2/public/js/templates/common/Statistics_tmpl.html b/dashboardv2/public/js/templates/common/Statistics_tmpl.html
index 13e64a7..71643c7 100644
--- a/dashboardv2/public/js/templates/common/Statistics_tmpl.html
+++ b/dashboardv2/public/js/templates/common/Statistics_tmpl.html
@@ -18,8 +18,8 @@
<div class="panel panel-default expand_collapse_panel-icon" data-id="entityActive">
<div class="panel-heading" data-toggle="collapse" href="#collapse1" aria-expanded="true">
<h4 class="panel-title">
- <a>Active Entities <span class="count">(0)</span></a>
- </h4>
+ <a>Active Entities <span class="count">(0)</span></a>
+ </h4>
<div class="btn-group pull-right">
<button type="button" title="Collapse"><i class="ec-icon fa"></i></button>
</div>
@@ -28,10 +28,10 @@
<div class="panel-body">
<table class="table stat-table">
<thead>
- <tr>
- <th>Entity</th>
- <th>Count</th>
- </tr>
+ <tr>
+ <th>Entity</th>
+ <th>Count</th>
+ </tr>
</thead>
<tbody >
</tbody>
@@ -42,8 +42,8 @@
<div class="panel panel-default expand_collapse_panel-icon" data-id="entityDelete">
<div class="panel-heading" data-toggle="collapse" href="#collapse2">
<h4 class="panel-title">
- <a>Deleted Entities <span class="count">(0)</span></a>
- </h4>
+ <a>Deleted Entities <span class="count">(0)</span></a>
+ </h4>
<div class="btn-group pull-right">
<button type="button" title="Collapse"><i class="ec-icon fa"></i></button>
</div>
@@ -52,10 +52,35 @@
<div class="panel-body">
<table class="table stat-table">
<thead>
- <tr>
- <th>Entity</th>
- <th>Count</th>
- </tr>
+ <tr>
+ <th>Entity</th>
+ <th>Count</th>
+ </tr>
+ </thead>
+ <tbody >
+ </tbody>
+ </table>
+ </div>
+ </div>
+ </div>
+
+ <div class="panel panel-default expand_collapse_panel-icon" data-id="stats">
+ <div class="panel-heading" data-toggle="collapse" href="#collapse3">
+ <h4 class="panel-title">
+ <a>Server Statistics </a>
+ </h4>
+ <div class="btn-group pull-right">
+ <button type="button" title="Collapse"><i class="ec-icon fa"></i></button>
+ </div>
+ </div>
+ <div id="collapse3" class="panel-collapse collapse">
+ <div class="panel-body">
+ <table class="table stat-table">
+ <thead>
+ <tr>
+ <th>Parameter</th>
+ <th>Value</th>
+ </tr>
</thead>
<tbody >
</tbody>
diff --git a/dashboardv2/public/js/views/common/Statistics.js b/dashboardv2/public/js/views/common/Statistics.js
index fc1596a..009babe 100644
--- a/dashboardv2/public/js/views/common/Statistics.js
+++ b/dashboardv2/public/js/views/common/Statistics.js
@@ -38,7 +38,8 @@ define(['require',
entityActive: "[data-id='entityActive'] tbody",
entityDelete: "[data-id='entityDelete'] tbody",
entityActiveHeader: "[data-id='entityActive'] .count",
- entityDeletedHeader: "[data-id='entityDelete'] .count"
+ entityDeletedHeader: "[data-id='entityDelete'] .count",
+ stats: "[data-id='stats'] tbody"
},
/** ui events hash */
events: function() {},
@@ -71,7 +72,8 @@ define(['require',
var data = _.first(data.toJSON()),
no_records = '<tr class="empty text-center"><td colspan="2"><span>No records found!</span></td></tr>',
activeEntityTable = _.isEmpty(data.entity.entityActive) ? no_records : that.getTable({ valueObject: data.entity.entityActive }),
- deleteEntityTable = _.isEmpty(data.entity.entityDeleted) ? no_records : that.getTable({ valueObject: data.entity.entityDeleted });
+ deleteEntityTable = _.isEmpty(data.entity.entityDeleted) ? no_records : that.getTable({ valueObject: data.entity.entityDeleted}),
+ stats = _.isEmpty(data.general.stats) ? no_records : that.getTable({ valueObject: data.general.stats, formatIntVal: false});
var totalActive = 0,
totalDeleted = 0;
if (data.entity && data.general.entityCount) {
@@ -84,6 +86,7 @@ define(['require',
}
that.ui.entityActive.html(activeEntityTable);
that.ui.entityDelete.html(deleteEntityTable);
+ that.ui.stats.html(stats);
that.ui.entityActiveHeader.html(" (" + _.numberFormatWithComa((totalActive - totalDeleted)) + ")");
that.ui.entityDeletedHeader.html(" (" + _.numberFormatWithComa(totalDeleted) + ")");
}
diff --git a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java b/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java
new file mode 100644
index 0000000..cb43059
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Atlas statistics
+ */
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtlasStatistics {
+ public static final String STAT_SERVER_START_TS = "serverStartTimeStamp";
+ public static final String STAT_SERVER_ACTIVE_TS = "serverActiveTimeStamp";
+ public static final String STAT_SERVER_UP_SINCE = "serverUpTime";
+ public static final String STAT_START_OFFSET = "KafkaTopic:ATLAS_HOOK:startOffset";
+ public static final String STAT_CURRENT_OFFSET = "KafkaTopic:ATLAS_HOOK:currentOffset";
+ public static final String STAT_SOLR_STATUS = "solrConnectionStatus";
+ public static final String STAT_HBASE_STATUS = "HBaseConnectionStatus";
+ public static final String STAT_LAST_MESSAGE_PROCESSED_TIME_TS = "lastMessageProcessedTimeStamp";
+ public static final String STAT_AVG_MESSAGE_PROCESSING_TIME = "avgMessageProcessingTime";
+
+ private Map<String, Object> data = new HashMap<>();
+
+ public Map<String, Object> getData() {
+ return data;
+ }
+
+ public void setData(Map<String, Object> data) {
+ this.data = data;
+ }
+
+ @Override
+ public String toString() {
+ return "AtlasStatistics{" +
+ "data=" + data +
+ '}';
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(data);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AtlasStatistics other = (AtlasStatistics) o;
+
+ return Objects.equals(this.data, other.data);
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
index 607b830..d9ea12a 100644
--- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java
+++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
@@ -18,11 +18,13 @@
package org.apache.atlas.services;
import org.apache.atlas.annotation.AtlasService;
+import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.StatisticsUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
@@ -63,13 +65,14 @@ public class MetricsService {
private final AtlasGraph atlasGraph;
private final AtlasTypeRegistry typeRegistry;
+ private final StatisticsUtil statisticsUtil;
private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix();
@Inject
- public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry) {
+ public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) {
this.atlasGraph = graph;
this.typeRegistry = typeRegistry;
-
+ this.statisticsUtil = statisticsUtil;
}
@SuppressWarnings("unchecked")
@@ -147,6 +150,10 @@ public class MetricsService {
metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, collectionTime);
+ //add atlas server stats
+ Map<String, Object> statistics = statisticsUtil.getAtlasStatistics();
+ metrics.addMetric(GENERAL, "stats", statistics);
+
return metrics;
}
@@ -156,7 +163,12 @@ public class MetricsService {
indexQuery = String.format(indexQuery, typeName, status.name());
- return atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals();
+ try {
+ return atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals();
+ }catch (Exception e){
+ LOG.error("Failed fetching using indexQuery: " + e.getMessage());
+ }
+ return 0l;
}
private int getAllTypesCount() {
diff --git a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java b/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java
new file mode 100644
index 0000000..d57f350
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.util;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.AtlasStatistics;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_START_TS;
+import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_ACTIVE_TS;
+import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_UP_SINCE;
+import static org.apache.atlas.model.AtlasStatistics.STAT_START_OFFSET;
+import static org.apache.atlas.model.AtlasStatistics.STAT_CURRENT_OFFSET;
+import static org.apache.atlas.model.AtlasStatistics.STAT_SOLR_STATUS;
+import static org.apache.atlas.model.AtlasStatistics.STAT_HBASE_STATUS;
+import static org.apache.atlas.model.AtlasStatistics.STAT_LAST_MESSAGE_PROCESSED_TIME_TS;
+import static org.apache.atlas.model.AtlasStatistics.STAT_AVG_MESSAGE_PROCESSING_TIME;
+
+@Component
+public class StatisticsUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(StatisticsUtil.class);
+
+ private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("d MMM, yyyy : hh:mm aaa z");
+
+ private static final long DAY = 1000 * 60 * 60 * 24;
+ private static final long HOUR = 1000 * 60 * 60;
+ private static final long MIN = 1000 * 60;
+ private static final long SEC = 1000;
+
+ private final AtlasGraph graph;
+ private final String STATUS_CONNECTED = "connected";
+ private final String STATUS_NOT_CONNECTED = "not-connected";
+ private final AtlasStatistics atlasStatistics;
+
+ private long countMsgProcessed = 0;
+ private long totalMsgProcessingTimeMs = 0;
+
+ @Inject
+ public StatisticsUtil(AtlasGraph graph) {
+ this.graph = graph;
+ this.atlasStatistics = new AtlasStatistics();
+ }
+
+ public Map<String, Object> getAtlasStatistics() {
+ Map<String, Object> statisticsMap = new HashMap<>();
+ statisticsMap.putAll(atlasStatistics.getData());
+
+ statisticsMap.put(STAT_HBASE_STATUS, getHBaseStatus());
+ statisticsMap.put(STAT_SOLR_STATUS , getSolrStatus());
+ statisticsMap.put(STAT_SERVER_UP_SINCE, getUpSinceTime());
+ formatStatistics(statisticsMap);
+
+ return statisticsMap;
+ }
+
+ public void setKafkaOffsets(long value){
+ if (Long.parseLong(getStat(STAT_START_OFFSET).toString()) == -1) {
+ addStat(STAT_START_OFFSET, value);
+ }
+ addStat(STAT_CURRENT_OFFSET, ++value);
+ }
+
+ public void setAvgMsgProcessingTime(long value) {
+ countMsgProcessed++;
+ totalMsgProcessingTimeMs += value;
+ value = totalMsgProcessingTimeMs / countMsgProcessed;
+
+ addStat(STAT_AVG_MESSAGE_PROCESSING_TIME, value);
+ }
+
+ public void setLastMsgProcessedTime() {
+ addStat(STAT_LAST_MESSAGE_PROCESSED_TIME_TS, System.currentTimeMillis());
+ }
+
+ public void setServerStartTime() {
+ addStat(STAT_SERVER_START_TS, System.currentTimeMillis());
+ }
+
+ public void setServerActiveTime() {
+ addStat(STAT_SERVER_ACTIVE_TS, System.currentTimeMillis());
+ }
+
+
+ private void addStat(String key, Object value) {
+ Map<String, Object> data = atlasStatistics.getData();
+ if (data == null) {
+ data = new HashMap<>();
+ }
+ data.put(key, value);
+ atlasStatistics.setData(data);
+ }
+
+ private Object getStat(String key) {
+ Map<String, Object> data = atlasStatistics.getData();
+ Object ret = data.get(key);
+ if (ret == null) {
+ return -1;
+ }
+ return ret;
+ }
+
+ private void formatStatistics(Map<String, Object> statisticsMap) {
+ for (Map.Entry<String, Object> stat : statisticsMap.entrySet()) {
+ switch (stat.getKey()) {
+ case STAT_SERVER_UP_SINCE:
+ statisticsMap.put(stat.getKey(), millisToTimeDiff(Long.parseLong(stat.getValue().toString())));
+ break;
+
+ case STAT_LAST_MESSAGE_PROCESSED_TIME_TS:
+ statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
+ break;
+
+ case STAT_SERVER_START_TS:
+ case STAT_SERVER_ACTIVE_TS:
+ statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
+ break;
+
+ case STAT_AVG_MESSAGE_PROCESSING_TIME:
+ statisticsMap.put(stat.getKey(), stat.getValue() + " milliseconds");
+ break;
+
+ case STAT_HBASE_STATUS:
+ case STAT_SOLR_STATUS:
+ String curState = ((boolean) stat.getValue()) ? STATUS_CONNECTED : STATUS_NOT_CONNECTED;
+ statisticsMap.put(stat.getKey(), curState);
+ break;
+
+ default:
+ statisticsMap.put(stat.getKey(), stat.getValue());
+ }
+ }
+ }
+
+ private boolean getHBaseStatus(){
+
+ String query = "g.V().next()";
+ try {
+ runWithTimeout(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ graph.executeGremlinScript(query, false);
+ } catch (AtlasBaseException e) {
+ LOG.error(e.getMessage());
+ }
+ }
+ }, 10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean getSolrStatus(){
+ String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + "__type.name\"" + " : (*)";
+ try {
+ runWithTimeout(new Runnable() {
+ @Override
+ public void run() {
+ graph.indexQuery(Constants.VERTEX_INDEX, query).vertexTotals();
+ }
+ }, 10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ private void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception {
+ runWithTimeout(new Callable<Object>() {
+ @Override
+ public Object call() {
+ runnable.run();
+ return null;
+ }
+ }, timeout, timeUnit);
+ }
+
+ private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ final Future<T> future = executor.submit(callable);
+ executor.shutdown();
+ try {
+ return future.get(timeout, timeUnit);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ throw e;
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof Error) {
+ throw (Error) t;
+ } else if (t instanceof Exception) {
+ throw (Exception) t;
+ } else {
+ throw new IllegalStateException(t);
+ }
+ }
+ }
+
+ private long getUpSinceTime() {
+ long upTS = Long.parseLong(getStat(STAT_SERVER_START_TS).toString());
+ return System.currentTimeMillis() - upTS;
+ }
+
+ private String millisToTimeDiff(long msDiff) {
+ StringBuilder sb = new StringBuilder();
+
+ long diffSeconds = msDiff / SEC % 60;
+ long diffMinutes = msDiff / MIN % 60;
+ long diffHours = msDiff / HOUR % 24;
+ long diffDays = msDiff / DAY;
+
+ if (diffDays > 0) sb.append(diffDays).append(" day ");
+ if (diffHours > 0) sb.append(diffHours).append(" hour ");
+ if (diffMinutes > 0) sb.append(diffMinutes).append(" min ");
+ if (diffSeconds > 0) sb.append(diffSeconds).append(" sec");
+
+ return sb.toString();
+ }
+
+ private String millisToTimeStamp(long ms) {
+ return simpleDateFormat.format(ms);
+ }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index d1d6003..48355c9 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -46,6 +46,7 @@ import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.atlas.utils.LruCache;
+import org.apache.atlas.util.StatisticsUtil;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
@@ -139,6 +140,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final ServiceState serviceState;
private final AtlasInstanceConverter instanceConverter;
private final AtlasTypeRegistry typeRegistry;
+ private final StatisticsUtil statisticsUtil;
private final int maxRetries;
private final int failedMsgCacheSize;
private final int minWaitDuration;
@@ -168,13 +170,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@Inject
public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore,
ServiceState serviceState, AtlasInstanceConverter instanceConverter,
- AtlasTypeRegistry typeRegistry) throws AtlasException {
+ AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) throws AtlasException {
this.notificationInterface = notificationInterface;
this.atlasEntityStore = atlasEntityStore;
this.serviceState = serviceState;
this.instanceConverter = instanceConverter;
this.typeRegistry = typeRegistry;
this.applicationProperties = ApplicationProperties.get();
+ this.statisticsUtil = statisticsUtil;
maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
@@ -686,7 +689,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
} finally {
AtlasPerfTracer.log(perf);
- long msgProcessingTime = perf != null ? perf.getElapsedTime() : 0;
+ long msgProcessingTime = System.currentTimeMillis() - startTime;
if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) {
String strMessage = AbstractNotification.getMessageJson(message);
@@ -697,10 +700,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
if (auditLog != null) {
auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK);
- auditLog.setTimeTaken(System.currentTimeMillis() - startTime);
+ auditLog.setTimeTaken(msgProcessingTime);
AuditFilter.audit(auditLog);
}
+ statisticsUtil.setAvgMsgProcessingTime(msgProcessingTime);
}
}
@@ -756,6 +760,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
consumer.commit(partition, kafkaMessage.getOffset() + 1);
commitSucceessStatus = true;
+ statisticsUtil.setKafkaOffsets(kafkaMessage.getOffset());
+ statisticsUtil.setLastMsgProcessedTime();
} finally {
failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset());
}
diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
index ad0cb84..7887afb 100644
--- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
@@ -23,6 +23,7 @@ import org.apache.atlas.ha.AtlasServerIdSelector;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.service.Service;
+import org.apache.atlas.util.StatisticsUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
@@ -39,6 +40,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
+
/**
* A service that implements leader election to determine whether this Atlas server is Active.
*
@@ -59,6 +61,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
private final Configuration configuration;
private final ServiceState serviceState;
private final ActiveInstanceState activeInstanceState;
+ private final StatisticsUtil statisticsUtil;
private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
private List<ActiveStateChangeHandler> activeStateChangeHandlers;
private CuratorFactory curatorFactory;
@@ -75,13 +78,14 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
ActiveInstanceElectorService(Configuration configuration,
Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders,
CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState,
- ServiceState serviceState) {
+ ServiceState serviceState, StatisticsUtil statisticsUtil) {
this.configuration = configuration;
this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders;
this.activeStateChangeHandlers = new ArrayList<>();
this.curatorFactory = curatorFactory;
this.activeInstanceState = activeInstanceState;
this.serviceState = serviceState;
+ this.statisticsUtil = statisticsUtil;
}
/**
@@ -92,7 +96,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
*/
@Override
public void start() throws AtlasException {
+ statisticsUtil.setServerStartTime();
if (!HAConfiguration.isHAEnabled(configuration)) {
+ statisticsUtil.setServerActiveTime();
LOG.info("HA is not enabled, no need to start leader election service");
return;
}
@@ -150,6 +156,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene
}
activeInstanceState.update(serverId);
serviceState.setActive();
+ statisticsUtil.setServerActiveTime();
} catch (Exception e) {
LOG.error("Got exception while activating", e);
notLeader();
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index b4b88d1..c7ba699 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -25,6 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.*;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.util.StatisticsUtil;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -80,6 +81,9 @@ public class NotificationHookConsumerKafkaTest {
@Mock
private AtlasTypeRegistry typeRegistry;
+ @Mock
+ private StatisticsUtil statisticsUtil;
+
@BeforeTest
public void setup() throws AtlasException, InterruptedException, AtlasBaseException {
MockitoAnnotations.initMocks(this);
@@ -104,7 +108,7 @@ public class NotificationHookConsumerKafkaTest {
produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
@@ -123,7 +127,7 @@ public class NotificationHookConsumerKafkaTest {
public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException {
ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
@@ -159,7 +163,7 @@ public class NotificationHookConsumerKafkaTest {
assertNotNull (consumer);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
consumeOneMessage(consumer, hookConsumer);
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 2618b04..de316b6 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification.HookNotificationType;
import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.util.StatisticsUtil;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -75,6 +76,9 @@ public class NotificationHookConsumerTest {
@Mock
private AtlasTypeRegistry typeRegistry;
+ @Mock
+ private StatisticsUtil statisticsUtil;
+
@BeforeMethod
public void setup() throws AtlasBaseException {
MockitoAnnotations.initMocks(this);
@@ -92,7 +96,7 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerCanProceedIfServerIsReady() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
@@ -105,7 +109,7 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
@@ -122,7 +126,7 @@ public class NotificationHookConsumerTest {
@Test
public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = mock(EntityCreateRequest.class);
@@ -139,7 +143,7 @@ public class NotificationHookConsumerTest {
@Test
public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationConsumer consumer = mock(NotificationConsumer.class);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer);
EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class)));
@@ -153,7 +157,7 @@ public class NotificationHookConsumerTest {
@Test
public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class);
@@ -174,7 +178,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService);
@@ -194,7 +198,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService);
@@ -213,7 +217,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsActive();
@@ -234,7 +238,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
doAnswer(new Answer() {
@Override
@@ -266,7 +270,7 @@ public class NotificationHookConsumerTest {
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
notificationHookConsumer.startInternal(configuration, executorService);
notificationHookConsumer.instanceIsPassive();
@@ -332,6 +336,6 @@ public class NotificationHookConsumerTest {
when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers);
- return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+ return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil);
}
}
diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
index dd2df70..0fe3eba 100644
--- a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.util.StatisticsUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.mockito.InOrder;
@@ -51,6 +52,9 @@ public class ActiveInstanceElectorServiceTest {
@Mock
private ServiceState serviceState;
+ @Mock
+ private StatisticsUtil statisticsUtil;
+
@BeforeMethod
public void setup() {
System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, "21000");
@@ -71,7 +75,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
verify(leaderLatch).start();
@@ -92,7 +96,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
verify(leaderLatch).addListener(activeInstanceElectorService);
@@ -104,7 +108,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
verifyZeroInteractions(curatorFactory);
@@ -125,7 +129,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.stop();
@@ -147,7 +151,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.stop();
@@ -161,7 +165,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.stop();
verifyZeroInteractions(curatorFactory);
@@ -189,7 +193,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
@@ -212,7 +216,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
@@ -245,7 +249,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
@@ -271,7 +275,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();
@@ -306,7 +310,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
- activeInstanceState, serviceState);
+ activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.notLeader();
@@ -318,7 +322,7 @@ public class ActiveInstanceElectorServiceTest {
public void testActiveStateSetOnBecomingLeader() {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
- curatorFactory, activeInstanceState, serviceState);
+ curatorFactory, activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.isLeader();
@@ -331,7 +335,7 @@ public class ActiveInstanceElectorServiceTest {
public void testPassiveStateSetOnLoosingLeadership() {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
- curatorFactory, activeInstanceState, serviceState);
+ curatorFactory, activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.notLeader();
@@ -358,7 +362,7 @@ public class ActiveInstanceElectorServiceTest {
ActiveInstanceElectorService activeInstanceElectorService =
new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
- curatorFactory, activeInstanceState, serviceState);
+ curatorFactory, activeInstanceState, serviceState, statisticsUtil);
activeInstanceElectorService.start();
activeInstanceElectorService.isLeader();