You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/04/05 18:24:15 UTC

[atlas] branch master updated: ATLAS-3017: Add Atlas server statistics rest endpoint #renaming fields

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new d6fe41d  ATLAS-3017: Add Atlas server statistics rest endpoint #renaming fields
d6fe41d is described below

commit d6fe41d4c6e014de1fb1eb7a2a618c0d93be6c48
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Fri Apr 5 11:23:53 2019 -0700

    ATLAS-3017: Add Atlas server statistics rest endpoint #renaming fields
    
    Signed-off-by: Sarath Subramanian <ss...@hortonworks.com>
---
 .../org/apache/atlas/model/AtlasStatistics.java    | 19 +++++++------
 .../java/org/apache/atlas/util/StatisticsUtil.java | 32 ++++++++++++++++++----
 2 files changed, 36 insertions(+), 15 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java b/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java
index cb43059..0ecbd9a 100644
--- a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java
+++ b/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java
@@ -35,15 +35,16 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 @JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class AtlasStatistics {
-    public static final String STAT_SERVER_START_TS                = "serverStartTimeStamp";
-    public static final String STAT_SERVER_ACTIVE_TS               = "serverActiveTimeStamp";
-    public static final String STAT_SERVER_UP_SINCE                = "serverUpTime";
-    public static final String STAT_START_OFFSET                   = "KafkaTopic:ATLAS_HOOK:startOffset";
-    public static final String STAT_CURRENT_OFFSET                 = "KafkaTopic:ATLAS_HOOK:currentOffset";
-    public static final String STAT_SOLR_STATUS                    = "solrConnectionStatus";
-    public static final String STAT_HBASE_STATUS                   = "HBaseConnectionStatus";
-    public static final String STAT_LAST_MESSAGE_PROCESSED_TIME_TS = "lastMessageProcessedTimeStamp";
-    public static final String STAT_AVG_MESSAGE_PROCESSING_TIME    = "avgMessageProcessingTime";
+    public static final String STAT_SERVER_START_TS                = "Server:upFrom";
+    public static final String STAT_SERVER_ACTIVE_TS               = "Server:activateFrom";
+    public static final String STAT_SERVER_UP_SINCE                = "Server:upTime";
+    public static final String STAT_START_OFFSET                   = "Notification:ATLAS_HOOK:offsetStart";
+    public static final String STAT_CURRENT_OFFSET                 = "Notification:ATLAS_HOOK:offsetCurrent";
+    public static final String STAT_SOLR_STATUS                    = "ConnectionStatus:Solr";
+    public static final String STAT_HBASE_STATUS                   = "ConnectionStatus:HBase";
+    public static final String STAT_LAST_MESSAGE_PROCESSED_TIME_TS = "Notification:ATLAS_HOOK:messageLastProcessedAt";
+    public static final String STAT_AVG_MESSAGE_PROCESSING_TIME    = "Notification:ATLAS_HOOK:messageAvgProcessingDuration";
+    public static final String STAT_MESSAGES_CONSUMED = "Notification:ATLAS_HOOK:messagesConsumed";
 
     private Map<String, Object> data = new HashMap<>();
 
diff --git a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java b/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java
index d57f350..efb804b 100644
--- a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java
+++ b/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java
@@ -28,9 +28,11 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
+import java.text.NumberFormat;
 import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Locale;
 import java.util.concurrent.*;
 
 import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_START_TS;
@@ -42,6 +44,7 @@ import static org.apache.atlas.model.AtlasStatistics.STAT_SOLR_STATUS;
 import static org.apache.atlas.model.AtlasStatistics.STAT_HBASE_STATUS;
 import static org.apache.atlas.model.AtlasStatistics.STAT_LAST_MESSAGE_PROCESSED_TIME_TS;
 import static org.apache.atlas.model.AtlasStatistics.STAT_AVG_MESSAGE_PROCESSING_TIME;
+import static org.apache.atlas.model.AtlasStatistics.STAT_MESSAGES_CONSUMED;
 
 @Component
 public class StatisticsUtil {
@@ -61,11 +64,14 @@ public class StatisticsUtil {
 
     private long countMsgProcessed        = 0;
     private long totalMsgProcessingTimeMs = 0;
+    private Locale locale                 = new Locale("en", "US");
+    private NumberFormat numberFormat;
 
     @Inject
     public StatisticsUtil(AtlasGraph graph) {
         this.graph = graph;
         this.atlasStatistics = new AtlasStatistics();
+        numberFormat = NumberFormat.getInstance(locale);
     }
 
     public Map<String, Object> getAtlasStatistics() {
@@ -73,14 +79,17 @@ public class StatisticsUtil {
         statisticsMap.putAll(atlasStatistics.getData());
 
         statisticsMap.put(STAT_HBASE_STATUS, getHBaseStatus());
-        statisticsMap.put(STAT_SOLR_STATUS , getSolrStatus());
+        statisticsMap.put(STAT_SOLR_STATUS, getSolrStatus());
         statisticsMap.put(STAT_SERVER_UP_SINCE, getUpSinceTime());
+        if(countMsgProcessed > 0) {
+            statisticsMap.put(STAT_MESSAGES_CONSUMED, countMsgProcessed);
+        }
         formatStatistics(statisticsMap);
 
         return statisticsMap;
     }
 
-    public void setKafkaOffsets(long value){
+    public void setKafkaOffsets(long value) {
         if (Long.parseLong(getStat(STAT_START_OFFSET).toString()) == -1) {
             addStat(STAT_START_OFFSET, value);
         }
@@ -143,7 +152,7 @@ public class StatisticsUtil {
                     break;
 
                 case STAT_AVG_MESSAGE_PROCESSING_TIME:
-                    statisticsMap.put(stat.getKey(), stat.getValue() + " milliseconds");
+                    statisticsMap.put(stat.getKey(), formatNumber(Long.parseLong(stat.getValue().toString())) + " milliseconds");
                     break;
 
                 case STAT_HBASE_STATUS:
@@ -152,13 +161,19 @@ public class StatisticsUtil {
                     statisticsMap.put(stat.getKey(), curState);
                     break;
 
+                case STAT_MESSAGES_CONSUMED:
+                case STAT_START_OFFSET:
+                case STAT_CURRENT_OFFSET:
+                    statisticsMap.put(stat.getKey(), formatNumber(Long.parseLong(stat.getValue().toString())));
+                    break;
+
                 default:
                     statisticsMap.put(stat.getKey(), stat.getValue());
             }
         }
     }
 
-    private boolean getHBaseStatus(){
+    private boolean getHBaseStatus() {
 
         String query = "g.V().next()";
         try {
@@ -180,13 +195,13 @@ public class StatisticsUtil {
         return true;
     }
 
-    private boolean getSolrStatus(){
+    private boolean getSolrStatus() {
         String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + "__type.name\"" + " : (*)";
         try {
             runWithTimeout(new Runnable() {
                 @Override
                 public void run() {
-                        graph.indexQuery(Constants.VERTEX_INDEX, query).vertexTotals();
+                    graph.indexQuery(Constants.VERTEX_INDEX, query).vertexTotals();
                 }
             }, 10, TimeUnit.SECONDS);
         } catch (Exception e) {
@@ -251,4 +266,9 @@ public class StatisticsUtil {
     private String millisToTimeStamp(long ms) {
         return simpleDateFormat.format(ms);
     }
+
+    private String formatNumber(long value) {
+        return numberFormat.format(value);
+    }
+
 }