You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/03 06:47:37 UTC

incubator-eagle git commit: [EAGLE-690] integrage topology health check with alert engine

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 2bc25969a -> f108c7f45


[EAGLE-690] integrage topology health check with alert engine

integrage topology health check with alert engine

Author: yupu <yu...@ebay.com>

Closes #570 from puyulu/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f108c7f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f108c7f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f108c7f4

Branch: refs/heads/master
Commit: f108c7f456a02d92c70479895f61d3ab724745f8
Parents: 2bc2596
Author: yupu <yu...@ebay.com>
Authored: Thu Nov 3 14:47:28 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Nov 3 14:47:28 2016 +0800

----------------------------------------------------------------------
 .../apache/eagle/topology/TopologyCheckApp.java |  17 +++
 .../eagle/topology/TopologyCheckAppConfig.java  |   8 +-
 .../topology/extractor/TopologyCrawler.java     |   3 +-
 .../extractor/TopologyEntityParser.java         |   6 +-
 .../extractor/TopologyEntityParserResult.java   |   2 +-
 .../hdfs/HdfsTopologyEntityParser.java          |  14 +-
 .../topology/extractor/mr/YarnNodeInfo.java     |  21 ++-
 .../extractor/mr/YarnNodeInfoWrapper.java       |   2 +-
 .../topology/extractor/mr/YarnNodeInfos.java    |   2 +-
 .../topology/resolver/TopologyRackResolver.java |   2 +-
 .../impl/IPMaskTopologyRackResolver.java        |  10 +-
 .../topology/storm/HealthCheckParseBolt.java    |  73 ++++++++++
 .../topology/storm/TopologyDataPersistBolt.java |  72 +++++++++-
 .../topology/utils/EntityBuilderHelper.java     |  18 +--
 .../eagle/topology/utils/JMXQueryHelper.java    |   6 +-
 .../utils/ServiceNotResponseException.java      |  11 +-
 .../eagle/topology/utils/StringUtils.java       |  45 ++++++
 ....eagle.topology.TopologyCheckAppProvider.xml | 142 ++++++++++++-------
 .../src/main/resources/application.conf         |  33 +++--
 .../entity/HBaseServiceTopologyAPIEntity.java   |   4 +-
 .../entity/HdfsServiceTopologyAPIEntity.java    |  11 +-
 .../entity/HealthCheckParseAPIEntity.java       |  67 +++++++++
 .../entity/JournalNodeServiceAPIEntity.java     |   4 +-
 .../entity/MRServiceTopologyAPIEntity.java      |  11 +-
 .../entity/TopologyEntityRepository.java        |   2 +-
 eagle-topology-check/pom.xml                    |  12 ++
 26 files changed, 474 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
index 95bcb4d..93a06f8 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
@@ -23,16 +23,25 @@ import backtype.storm.topology.TopologyBuilder;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.topology.storm.HealthCheckParseBolt;
 import org.apache.eagle.topology.storm.TopologyCheckAppSpout;
 import org.apache.eagle.topology.storm.TopologyDataPersistBolt;
 
 public class TopologyCheckApp extends StormApplication {
+
+    private static final String SINK_TASK_NUM = "topology.numOfSinkTasks";
+    private static final String TOPOLOGY_HEALTH_CHECK_STREAM = "topology_health_check_stream";
+
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
         TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config);
 
         String spoutName = TopologyCheckAppConfig.TOPOLOGY_DATA_FETCH_SPOUT_NAME;
         String persistBoltName = TopologyCheckAppConfig.TOPOLOGY_ENTITY_PERSIST_BOLT_NAME;
+        String parseBoltName = TopologyCheckAppConfig.PARSE_BOLT_NAME;
+        String kafkaSinkBoltName = TopologyCheckAppConfig.SINK_BOLT_NAME;
+        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
 
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout(
@@ -47,6 +56,14 @@ public class TopologyCheckApp extends StormApplication {
             topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt
         ).setNumTasks(topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(spoutName);
 
+        topologyBuilder.setBolt(
+            parseBoltName,
+            new HealthCheckParseBolt(),
+            topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(persistBoltName);
+
+        StormStreamSink<?> sinkBolt = environment.getStreamSink(TOPOLOGY_HEALTH_CHECK_STREAM, config);
+        topologyBuilder.setBolt(kafkaSinkBoltName, sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks).shuffleGrouping(parseBoltName);
+
         return topologyBuilder.createTopology();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index 409a87f..0b7cb3d 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -32,6 +32,8 @@ public class TopologyCheckAppConfig implements Serializable {
 
     public static final String TOPOLOGY_DATA_FETCH_SPOUT_NAME = "topologyDataFetcherSpout";
     public static final String TOPOLOGY_ENTITY_PERSIST_BOLT_NAME = "topologyEntityPersistBolt";
+    public static final String PARSE_BOLT_NAME = "parserBolt";
+    public static final String SINK_BOLT_NAME = "sinkBolt";
 
     private static final int MAX_NUM_THREADS = 10;
     private static final String HBASE_ZOOKEEPER_CLIENT_PORT = "2181";
@@ -98,7 +100,7 @@ public class TopologyCheckAppConfig implements Serializable {
         if (config.hasPath("dataSourceConfig.mr")) {
             topologyTypes.add(TopologyConstants.TopologyType.MR);
             mrConfig = new MRConfig();
-            mrConfig.rmUrls =  config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*");
+            mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*");
             mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null);
         }
 
@@ -129,12 +131,12 @@ public class TopologyCheckAppConfig implements Serializable {
     }
 
     public static class MRConfig implements Serializable {
-        public String [] rmUrls;
+        public String[] rmUrls;
         public String historyServerUrl;
     }
 
     public static class HdfsConfig implements Serializable {
-        public String [] namenodeUrls;
+        public String[] namenodeUrls;
     }
 
     private String getOptionalConfig(String key, String defaultValue) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java
index 55eba15..2451db6 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java
@@ -21,7 +21,8 @@ package org.apache.eagle.topology.extractor;
 public interface TopologyCrawler {
 
     /**
-     * Fetch raw data and emit the parsed result to the next bolt
+     * Fetch raw data and emit the parsed result to the next bolt.
+     *
      * @return
      */
     void extract();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
index 6a860ad..64c9bd4 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
@@ -24,19 +24,19 @@ import java.io.IOException;
 
 public interface TopologyEntityParser {
     /**
-     * Parse hadoop topology and return the topology entity results
+     * Parse hadoop topology and return the topology entity results.
      * @return the topology entity result
      */
     public TopologyEntityParserResult parse(long timestamp) throws IOException;
 
     /**
-     * Get topology type for the parser
+     * Get topology type for the parser.
      * @return topology type
      */
     public TopologyConstants.TopologyType getTopologyType();
 
     /**
-     * Get hadoop version for the parser
+     * Get hadoop version for the parser.
      * @return
      */
     public TopologyConstants.HadoopVersion getHadoopVersion();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
index f5746f7..1799054 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
@@ -21,7 +21,6 @@ package org.apache.eagle.topology.extractor;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.topology.TopologyConstants;
 import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -58,6 +57,7 @@ public class TopologyEntityParserResult {
     public TopologyConstants.HadoopVersion getVersion() {
         return version;
     }
+
     public void setVersion(TopologyConstants.HadoopVersion version) {
         this.version = version;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
index 1cec474..c35ed18 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
@@ -47,7 +47,7 @@ import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
 public class HdfsTopologyEntityParser implements TopologyEntityParser {
 
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class);
-    private String [] namenodeUrls;
+    private String[] namenodeUrls;
     private String site;
     private TopologyRackResolver rackResolver;
 
@@ -78,7 +78,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
 
     private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)";
     private static final String QJM_PATTERN = "([\\d\\.]+):\\d+";
-    
+
     private static final double TB = 1024 * 1024 * 1024 * 1024;
 
     public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) {
@@ -118,9 +118,9 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
         if (bean == null || bean.getPropertyMap() == null) {
             throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!");
         }
-        final String hostname = (String)bean.getPropertyMap().get(HA_NAME);
+        final String hostname = (String) bean.getPropertyMap().get(HA_NAME);
         HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
-        final String state = (String)bean.getPropertyMap().get(HA_STATE);
+        final String state = (String) bean.getPropertyMap().get(HA_STATE);
         result.setStatus(state);
         final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
         result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024));
@@ -155,9 +155,9 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
         JSONObject jsonMap = (JSONObject) jsonArray.get(0);
 
         Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>();
-        String QJM = jsonMap.getString("manager");
+        String manager = jsonMap.getString("manager");
         Pattern qjm = Pattern.compile(QJM_PATTERN);
-        Matcher jpmMatcher = qjm.matcher(QJM);
+        Matcher jpmMatcher = qjm.matcher(manager);
         while (jpmMatcher.find()) {
             String ip = jpmMatcher.group(1);
             String hostname = EntityBuilderHelper.resolveHostByIp(ip);
@@ -196,7 +196,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
         int numDeadDecommNodes = 0;
 
         String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES);
-        JSONTokener tokener  = new JSONTokener(deadNodesStrings);
+        JSONTokener tokener = new JSONTokener(deadNodesStrings);
         JSONObject jsonNodesObject = new JSONObject(tokener);
         final JSONArray deadNodes = jsonNodesObject.names();
         for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
index 9315f78..4ca2b06 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
@@ -21,7 +21,7 @@ package org.apache.eagle.topology.extractor.mr;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class YarnNodeInfo {
 
@@ -40,60 +40,79 @@ public class YarnNodeInfo {
     public String getRack() {
         return rack;
     }
+
     public void setRack(String rack) {
         this.rack = rack;
     }
+
     public String getState() {
         return state;
     }
+
     public void setState(String state) {
         this.state = state;
     }
+
     public String getId() {
         return id;
     }
+
     public void setId(String id) {
         this.id = id;
     }
+
     public String getNodeHostName() {
         return nodeHostName;
     }
+
     public void setNodeHostName(String nodeHostName) {
         this.nodeHostName = nodeHostName;
     }
+
     public String getNodeHTTPAddress() {
         return nodeHTTPAddress;
     }
+
     public void setNodeHTTPAddress(String nodeHTTPAddress) {
         this.nodeHTTPAddress = nodeHTTPAddress;
     }
+
     public String getLastHealthUpdate() {
         return lastHealthUpdate;
     }
+
     public void setLastHealthUpdate(String lastHealthUpdate) {
         this.lastHealthUpdate = lastHealthUpdate;
     }
+
     public String getHealthReport() {
         return healthReport;
     }
+
     public void setHealthReport(String healthReport) {
         this.healthReport = healthReport;
     }
+
     public String getNumContainers() {
         return numContainers;
     }
+
     public void setNumContainers(String numContainers) {
         this.numContainers = numContainers;
     }
+
     public String getUsedMemoryMB() {
         return usedMemoryMB;
     }
+
     public void setUsedMemoryMB(String usedMemoryMB) {
         this.usedMemoryMB = usedMemoryMB;
     }
+
     public String getAvailMemoryMB() {
         return availMemoryMB;
     }
+
     public void setAvailMemoryMB(String availMemoryMB) {
         this.availMemoryMB = availMemoryMB;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
index 83d8d7f..8079e49 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
@@ -21,7 +21,7 @@ package org.apache.eagle.topology.extractor.mr;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class YarnNodeInfoWrapper {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
index d715a1e..536edd8 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
@@ -24,7 +24,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 import java.util.List;
 
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class YarnNodeInfos {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
index ab7b3cc..c4b4976 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
@@ -21,7 +21,7 @@ package org.apache.eagle.topology.resolver;
 public interface TopologyRackResolver {
 
     /**
-     *resolve rack by hostname
+     *resolve rack by hostname.
      * @return rack name
      */
     String resolve(String hostname);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
index df0f863..821de3c 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
@@ -24,19 +24,19 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 /**
- * resolve rack by hostname
+ * resolve rack by hostname.
  */
 public class IPMaskTopologyRackResolver implements TopologyRackResolver {
 
-    private final int DEFAULT_RACK_POS = 2;
+    private final int pos = 2;
     private int rackPos;
 
     public IPMaskTopologyRackResolver() {
-        this.rackPos = DEFAULT_RACK_POS;
+        this.rackPos = pos;
     }
 
     public IPMaskTopologyRackResolver(int rackPos) {
-        this.rackPos = (rackPos > 3 || rackPos < 0) ? DEFAULT_RACK_POS : rackPos;
+        this.rackPos = (rackPos > 3 || rackPos < 0) ? pos : rackPos;
     }
 
     @Override
@@ -44,7 +44,7 @@ public class IPMaskTopologyRackResolver implements TopologyRackResolver {
         String result = null;
         try {
             InetAddress address = InetAddress.getByName(hostname);
-            result = "rack" + (int)(address.getAddress()[rackPos] & 0xff);
+            result = "rack" + (int) (address.getAddress()[rackPos] & 0xff);
         } catch (UnknownHostException e) {
             //e.printStackTrace();
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java
new file mode 100644
index 0000000..a0b282b
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eagle.topology.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class HealthCheckParseBolt extends BaseRichBolt {
+
+    private static Logger LOG = LoggerFactory.getLogger(HealthCheckParseBolt.class);
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        this.collector = outputCollector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        HealthCheckParseAPIEntity result = null;
+        try {
+            result = (HealthCheckParseAPIEntity) tuple.getValueByField("f1");
+            Map<String, Object> map = new TreeMap<>();
+            map.put("status", result.getStatus());
+            map.put("timestamp", result.getTimeStamp());
+            map.put("role", result.getRole());
+            map.put("host", result.getHost());
+            map.put("site", result.getSite());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("emitted " + map);
+            }
+            collector.emit(Arrays.asList(result.getHost(), map));
+        } catch (Exception ex) {
+            LOG.error("Failing parse security log message, and ignore this message", ex);
+        } finally {
+            collector.ack(tuple);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
index 9b0eb82..e0ff3cd 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -18,11 +18,20 @@
 
 package org.apache.eagle.topology.storm;
 
+import static org.apache.eagle.topology.TopologyConstants.HOSTNAME_TAG;
+import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
+import static org.apache.eagle.topology.TopologyConstants.ROLE_TAG;
+import static org.apache.eagle.topology.TopologyConstants.SITE_TAG;
+
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.EagleServiceClientException;
@@ -32,6 +41,10 @@ import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.apache.eagle.topology.TopologyCheckAppConfig;
 import org.apache.eagle.topology.TopologyConstants;
 import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
 import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +66,8 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig()));
+        this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig().getString("service.host"), this.config.getConfig().getInt("service.port"),
+            this.config.getConfig().getString("service.username"), this.config.getConfig().getString("service.password")));
         this.collector = collector;
     }
 
@@ -84,9 +98,10 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
             deleteEntities(entitiesForDeletion, serviceName);
             writeEntities(entitiesToWrite, serviceName);
             writeEntities(result.getMetrics(), serviceName);
+            emitToKafkaBolt(result);
             this.collector.ack(input);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOG.error(e.getMessage(), e);
             this.collector.fail(input);
         }
     }
@@ -100,7 +115,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
+        declarer.declare(new Fields("f1"));
     }
 
     private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) {
@@ -112,9 +127,9 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
                 LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName);
             }
         } catch (EagleServiceClientException e) {
-            e.printStackTrace();
+            LOG.error(e.getMessage(), e);
         } catch (IOException e) {
-            e.printStackTrace();
+            LOG.error(e.getMessage(), e);
         }
         entities.clear();
     }
@@ -135,8 +150,51 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
 
     private String generateKey(TopologyBaseAPIEntity entity) {
         return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG),
-                entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
-                entity.getTags().get(TopologyConstants.ROLE_TAG));
+            entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
+            entity.getTags().get(TopologyConstants.ROLE_TAG));
     }
 
+    private void emitToKafkaBolt(TopologyEntityParserResult result) {
+
+        List<HealthCheckParseAPIEntity> healthCheckParseAPIList = new ArrayList<HealthCheckParseAPIEntity>();
+
+        setNodeInfo(result.getMasterNodes(), healthCheckParseAPIList);
+
+        setNodeInfo(result.getSlaveNodes(), healthCheckParseAPIList);
+
+        for (HealthCheckParseAPIEntity healthCheckAPIEntity : healthCheckParseAPIList) {
+            this.collector.emit(new Values(healthCheckAPIEntity));
+        }
+
+    }
+
+    private void setNodeInfo(List<TopologyBaseAPIEntity> topologyBaseAPIList, List<HealthCheckParseAPIEntity> healthCheckParseAPIList) {
+        HealthCheckParseAPIEntity healthCheckAPIEntity = null;
+        for (Iterator<TopologyBaseAPIEntity> iterator = topologyBaseAPIList.iterator(); iterator.hasNext(); ) {
+
+            healthCheckAPIEntity = new HealthCheckParseAPIEntity();
+            TopologyBaseAPIEntity topologyBaseAPIEntity = iterator.next();
+
+            if (topologyBaseAPIEntity instanceof HBaseServiceTopologyAPIEntity) {
+
+                healthCheckAPIEntity.setStatus(((HBaseServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+
+            }
+            if (topologyBaseAPIEntity instanceof HdfsServiceTopologyAPIEntity) {
+
+                healthCheckAPIEntity.setStatus(((HdfsServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+            }
+
+            if (topologyBaseAPIEntity instanceof MRServiceTopologyAPIEntity) {
+
+                healthCheckAPIEntity.setStatus(((MRServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+            }
+
+            healthCheckAPIEntity.setTimeStamp(topologyBaseAPIEntity.getTimestamp());
+            healthCheckAPIEntity.setHost(topologyBaseAPIEntity.getTags().get(HOSTNAME_TAG));
+            healthCheckAPIEntity.setRole(topologyBaseAPIEntity.getTags().get(ROLE_TAG));
+            healthCheckAPIEntity.setSite(topologyBaseAPIEntity.getTags().get(SITE_TAG));
+            healthCheckParseAPIList.add(healthCheckAPIEntity);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
index 9bad73b..f390fa8 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
@@ -21,16 +21,12 @@ package org.apache.eagle.topology.utils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
-import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.eagle.topology.TopologyConstants.*;
-
 public class EntityBuilderHelper {
 
     public static String resolveHostByIp(String ip) {
@@ -48,7 +44,7 @@ public class EntityBuilderHelper {
         metricEntity.setTimestamp(timestamp);
         metricEntity.setTags(tags);
         metricEntity.setPrefix(metricName);
-        metricEntity.setValue(new double[]{value});
+        metricEntity.setValue(new double[] {value});
         return metricEntity;
     }
 
@@ -59,12 +55,12 @@ public class EntityBuilderHelper {
         String metricName = String.format(TopologyConstants.METRIC_LIVE_RATIO_NAME_FORMAT, role);
         return EntityBuilderHelper.metricWrapper(timestamp, metricName, value, tags);
     }
-    
-    public static String getValidHostName(String key){
-    	if(StringUtils.isBlank(key)){
-    		throw new IllegalArgumentException("key can not be empty");
-    	}    	
-    	return key.indexOf(TopologyConstants.COLON) > 0 ? key.substring(0,key.indexOf(TopologyConstants.COLON)) : key;    	
+
+    public static String getValidHostName(String key) {
+        if (StringUtils.isBlank(key)) {
+            throw new IllegalArgumentException("key can not be empty");
+        }
+        return key.indexOf(TopologyConstants.COLON) > 0 ? key.substring(0, key.indexOf(TopologyConstants.COLON)) : key;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
index fa4c71f..bca8485 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
@@ -34,7 +34,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Helper class to query Hadoop JMX servlets
+ * Helper class to query Hadoop JMX servlets.
  */
 public final class JMXQueryHelper {
 
@@ -66,13 +66,13 @@ public final class JMXQueryHelper {
         final JSONArray jsonArray = jsonBeansObject.getJSONArray("beans");
         int size = jsonArray.length();
         for (int i = 0; i < size; ++i) {
-            final JSONObject obj = (JSONObject)jsonArray.get(i);
+            final JSONObject obj = (JSONObject) jsonArray.get(i);
             final JMXBean bean = new JMXBean();
             final Map<String, Object> map = new HashMap<String, Object>();
             bean.setPropertyMap(map);
             final JSONArray names = obj.names();
             int jsonSize = names.length();
-            for (int j = 0 ; j < jsonSize; ++j) {
+            for (int j = 0; j < jsonSize; ++j) {
                 final String key = names.getString(j);
                 Object value = obj.get(key);
                 map.put(key, value);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
index 48c9133..3204ca3 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
@@ -25,14 +25,14 @@ public class ServiceNotResponseException extends IOException {
     private static final long serialVersionUID = -2425311876734366496L;
 
     /**
-     * Default constructor of FeederException
+     * Default constructor of FeederException.
      */
     public ServiceNotResponseException() {
         super();
     }
 
     /**
-     * Constructor of FeederException
+     * Constructor of FeederException.
      *
      * @param message error message
      */
@@ -41,18 +41,17 @@ public class ServiceNotResponseException extends IOException {
     }
 
     /**
-     * Constructor of FeederException
+     * Constructor of FeederException.
      *
      * @param message error message
-     * @param cause the cause of the exception
-     *
+     * @param cause   the cause of the exception
      */
     public ServiceNotResponseException(String message, Throwable cause) {
         super(message, cause);
     }
 
     /**
-     * Constructor of FeederException
+     * Constructor of FeederException.
      *
      * @param cause the cause of the exception
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java
new file mode 100644
index 0000000..3b47f84
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java
@@ -0,0 +1,45 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public final class StringUtils {
+
+    public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+    private StringUtils() {
+
+    }
+
+    public static String convertMapToString(Map<String, String> tags) {
+        StringBuilder tagBuilder = new StringBuilder();
+        Iterator<Entry<String, String>> iter = tags.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, String> entry = (Map.Entry<String, String>) iter.next();
+            tagBuilder.append(entry.getKey() + ":" + entry.getValue());
+            if (iter.hasNext()) {
+                tagBuilder.append(",");
+            }
+        }
+        return tagBuilder.toString();
+    }
+} 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index ed31a8f..8dbf79e 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -22,128 +22,166 @@
     <name>Topology Health Check</name>
     <version>0.5.0-incubating</version>
     <configuration>
-        <!-- org.apache.eagle.topology.TopologyCheckApp -->
+        <!-- org.apache.eagle.topology.TopologyCheckApp -->   
         <property>
             <name>dataExtractorConfig.site</name>
             <displayName>site</displayName>
             <description>Site</description>
             <value>sandbox</value>
-        </property>
+        </property>  
         <property>
             <name>dataExtractorConfig.fetchDataIntervalInSecs</name>
-            <displayName>FetchDataIntervalInSecs</displayName>
+            <displayName>Fetch Data Interval in Secs</displayName>
             <description>Fetch Data Interval in Secs</description>
             <value>300</value>
         </property>
         <property>
             <name>dataExtractorConfig.parseThreadPoolSize</name>
-            <displayName>parseThreadPoolSize</displayName>
+            <displayName>Parser Thread Pool Size</displayName>
             <description>Parser Thread Pool Size</description>
             <value>5</value>
         </property>
         <property>
             <name>dataExtractorConfig.numDataFetcherSpout</name>
-            <displayName>numDataFetcherSpout</displayName>
+            <displayName>Spout Task Number</displayName>
             <description>Spout Task Number</description>
             <value>1</value>
         </property>
         <property>
             <name>dataExtractorConfig.numEntityPersistBolt</name>
-            <displayName>numEntityPersistBolt</displayName>
+            <displayName>Bolt Task Number</displayName>
             <description>Bolt Task Number</description>
             <value>1</value>
-        </property>
+        </property> 
+        
+        <property>
+            <name>dataExtractorConfig.rackResolverCls</name>
+            <displayName>Rack Resolver Class</displayName>
+            <description>rack resolver class</description>
+            <value>org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver</value>
+        </property>     
+      
         <property>
             <name>dataSourceConfig.hbase.zkQuorum</name>
-            <displayName>zkQuorum</displayName>
-            <description>Zookeeper Quorum</description>
+            <displayName>HBase Zookeeper Quorum</displayName>
+            <description>HBase Zookeeper Quorum</description>
             <value>sandbox.hortonworks.com:2181</value>
+            <required>true</required>
         </property>
         <property>
             <name>dataSourceConfig.hbase.zkZnodeParent</name>
-            <displayName>zkZnodeParent</displayName>
+            <displayName>Hbase Zookeeper Znode Parent Root</displayName>
             <description>Hbase Zookeeper Znode Parent Root</description>
             <value>/hbase-unsecure</value>
+            <required>true</required>
         </property>
         <property>
             <name>dataSourceConfig.hbase.zkPropertyClientPort</name>
-            <displayName>zkPropertyClientPort</displayName>
+            <displayName>Hbase Zookeeper Client Port</displayName>
             <description>Hbase Zookeeper Client Port</description>
             <value>2181</value>
+            <required>true</required>
         </property>
         <property>
             <name>dataSourceConfig.hbase.kerberos.master.principal</name>
-            <displayName>hbaseMasterPrincipal</displayName>
+            <displayName>Hbase Master Principal</displayName>
             <description>Hbase Master Principal</description>
             <value>hadoop/_HOST@EXAMPLE.COM</value>
         </property>
-        <property>
-            <name>dataSourceConfig.hbase.kerberos.eagle.keytab</name>
-            <displayName>eagleKeytab</displayName>
-            <description>Eagle keytab</description>
-            <value></value>
-        </property>
+     
         <property>
             <name>dataSourceConfig.hbase.kerberos.master.principal</name>
-            <displayName>hbaseMasterPrincipal</displayName>
+            <displayName>Hbase Master Principal</displayName>
             <description>Hbase Master Principal</description>
             <value>hadoop/_HOST@EXAMPLE.COM</value>
-        </property>
-
+        </property>  
+        
         <property>
             <name>dataSourceConfig.hdfs.namenodeUrl</name>
-            <displayName>hdfsNamenodeUrl</displayName>
+            <displayName>Hdfs Namenode Web URL</displayName>
             <description>Hdfs Namenode Web URL</description>
             <value>http://sandbox.hortonworks.com:50070</value>
+            <required>true</required>
         </property>
-
         <property>
             <name>dataSourceConfig.mr.rmUrl</name>
-            <displayName>resourceManagerUrl</displayName>
+            <displayName>Resource Manager URL</displayName>
             <description>Resource Manager URL</description>
             <value>http://sandbox.hortonworks.com:8088</value>
+            <required>true</required>
         </property>
 
         <property>
             <name>dataSourceConfig.mr.historyServerUrl</name>
-            <displayName>historyServerUrl</displayName>
-            <description>History Server URL</description>
-            <value></value>
-        </property>
-
-        <property>
-            <name>eagleProps.eagleService.host</name>
-            <description>eagleProps.eagleService.host</description>
-            <value>localhost</value>
-        </property>
-        <property>
-            <name>eagleProps.eagleService.port</name>
-            <description>eagleProps.eagleService.port</description>
-            <value>9090</value>
+            <displayName>History Server URL</displayName>
+            <description>History Server URL</description>            
+            <value></value>  
+        </property> 
+        
+        <property>
+            <name>topology.numOfSinkTasks</name>
+            <displayName>topology.numOfSinkTasks</displayName>
+            <value>2</value>
+            <description>number of sink tasks</description>
         </property>
+        
+        <!-- data sink configurations -->
         <property>
-            <name>eagleProps.eagleService.username</name>
-            <description>eagleProps.eagleService.username</description>
-            <value>admin</value>
+            <name>dataSinkConfig.topic</name>
+            <displayName>Topic For Kafka Data Sink</displayName>
+            <value>topology_health_check</value>
+            <description>Topic For Kafka Data Sink</description>
         </property>
         <property>
-            <name>eagleProps.eagleService.password</name>
-            <description>eagleProps.eagleService.password</description>
-            <value>secret</value>
+            <name>dataSinkConfig.brokerList</name>
+            <displayName>Kafka Broker List</displayName>
+            <value>server.eagle.apache.org:9092</value>
+            <description>Kafka Broker List</description>
         </property>
         <property>
-            <name>eagleProps.eagleService.basePath</name>
-            <description>eagleProps.eagleService.basePath</description>
-            <value>/rest</value>
+            <name>dataSinkConfig.serializerClass</name>
+            <displayName>Serializer Class Kafka Message Value</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>Serializer Class Kafka Message Value</description>
         </property>
         <property>
-            <name>eagleProps.eagleService.readTimeOutSeconds</name>
-            <displayName>eagleProps.eagleService.readTimeOutSeconds</displayName>
-            <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description>
-            <value>2</value>
+            <name>dataSinkConfig.keySerializerClass</name>
+            <displayName>Serializer Class Kafka Message Key</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>Serializer Class Kafka Message Key</description>
         </property>
-
+        
     </configuration>
+    <streams>
+        <stream>
+            <streamId>topology_health_check_stream</streamId>
+            <description>topology health check Stream</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>    
+            <columns>
+                <column>
+                    <name>status</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>host</name>
+                    <type>string</type>
+                </column> 
+                <column>
+                    <name>site</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>role</name>
+                    <type>string</type>
+                </column>            
+            </columns>
+        </stream>
+    </streams>
     <docs>
         <install>
         </install>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
index cbc7ac1..f069df5 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
@@ -18,7 +18,11 @@
   mode : "LOCAL",
   workers : 1,
 
-  dataExtractorConfig : {
+  topology : {
+    "numOfSinkTasks" : 2
+  }
+
+  dataExtractorConfig : {  
     "site": "sandbox",
     "fetchDataIntervalInSecs": 300,
     "parseThreadPoolSize": 5,
@@ -41,20 +45,23 @@
       }
     },
     mr: {
-      rmUrl: "http://sandbox.hortonworks.com:8088",
+      rmUrl: "http://sandbox.hortonworks.com:50030",
       historyServerUrl : "http://sandbox.hortonworks.com:19888"  #if not need, then empty
     }
   }
-
-  eagleProps : {
-    "mailHost" : "abc.com",
-    "mailDebug" : "true",
-    eagleService.host:"localhost",
-    eagleService.port: 9090,
-    eagleService.username: "admin",
-    eagleService.password : "secret",
-    eagleService.basePath : "/rest",
-    eagleService.readTimeOutSeconds : 20,
-    eagleService.maxFlushNum : 500
+  
+  "dataSinkConfig": {
+    "topic" : "topology_health_check",
+    "brokerList" : "sandbox.hortonworks.com:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder" 
+  }
+  
+  "service": {
+    "host": "localhost",
+    "port": 9090,
+    "username": "admin",
+    "password": "secret",
+    "readTimeOutSeconds" : 10,
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java
index 24761dc..d91ff5c 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java
@@ -22,13 +22,13 @@ import org.apache.eagle.log.entity.meta.*;
 import org.apache.eagle.topology.TopologyConstants;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-@JsonSerialize(include= JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("hadoop_topology")
 @ColumnFamily("f")
 @Prefix("hbaseservicestatus")
 @Service(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME)
 @TimeSeries(false)
-public class HBaseServiceTopologyAPIEntity  extends TopologyBaseAPIEntity {
+public class HBaseServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
     @Column("a")
     private String status;
     @Column("b")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java
index 0ef6d0a..baca97b 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java
@@ -22,7 +22,7 @@ import org.apache.eagle.log.entity.meta.*;
 import org.apache.eagle.topology.TopologyConstants;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("hadoop_topology")
 @ColumnFamily("f")
 @Prefix("hdfsservicestatus")
@@ -56,34 +56,43 @@ public class HdfsServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
     public String getNumFailedVolumes() {
         return numFailedVolumes;
     }
+
     public void setNumFailedVolumes(String numFailedVolumes) {
         this.numFailedVolumes = numFailedVolumes;
         valueChanged("numFailedVolumes");
     }
+
     public String getNumBlocks() {
         return numBlocks;
     }
+
     public void setNumBlocks(String numBlocks) {
         this.numBlocks = numBlocks;
         valueChanged("numBlocks");
     }
+
     public String getStatus() {
         return status;
     }
+
     public void setStatus(String status) {
         this.status = status;
         valueChanged("status");
     }
+
     public String getConfiguredCapacityTB() {
         return configuredCapacityTB;
     }
+
     public void setConfiguredCapacityTB(String configuredCapacityTB) {
         this.configuredCapacityTB = configuredCapacityTB;
         valueChanged("configuredCapacityTB");
     }
+
     public String getUsedCapacityTB() {
         return usedCapacityTB;
     }
+
     public void setUsedCapacityTB(String usedCapacityTB) {
         this.usedCapacityTB = usedCapacityTB;
         valueChanged("usedCapacityTB");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java
new file mode 100644
index 0000000..d78fea6
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.topology.entity;
+
+public class HealthCheckParseAPIEntity {
+
+    private String status;
+    private long timeStamp;
+    private String host;
+    private String role;
+    private String site;
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+    public long getTimeStamp() {
+        return timeStamp;
+    }
+
+    public void setTimeStamp(long timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public String getRole() {
+        return role;
+    }
+
+    public void setRole(String role) {
+        this.role = role;
+    }
+
+    public String getSite() {
+        return site;
+    }
+
+    public void setSite(String site) {
+        this.site = site;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java
index f21ad7a..392b107 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java
@@ -22,13 +22,13 @@ import org.apache.eagle.log.entity.meta.*;
 import org.apache.eagle.topology.TopologyConstants;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("hadoop_topology")
 @ColumnFamily("f")
 @Prefix("journalnodestatus")
 @Service(TopologyConstants.JN_INSTANCE_SERVICE_NAME)
 @TimeSeries(false)
-@Tags({TopologyConstants.SITE_TAG, TopologyConstants.HOSTNAME_TAG, TopologyConstants.RACK_TAG,  TopologyConstants.ROLE_TAG})
+@Tags( {TopologyConstants.SITE_TAG, TopologyConstants.HOSTNAME_TAG, TopologyConstants.RACK_TAG, TopologyConstants.ROLE_TAG})
 public class JournalNodeServiceAPIEntity extends TopologyBaseAPIEntity {
     @Column("a")
     private long writtenTxidDiff;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
index 75dfbfb..82e57fd 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
@@ -22,13 +22,13 @@ import org.apache.eagle.log.entity.meta.*;
 import org.apache.eagle.topology.TopologyConstants;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("hadoop_topology")
 @ColumnFamily("f")
 @Prefix("mrservicestatus")
 @Service(TopologyConstants.MR_INSTANCE_SERVICE_NAME)
 @TimeSeries(false)
-public class MRServiceTopologyAPIEntity  extends TopologyBaseAPIEntity {
+public class MRServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
     @Column("a")
     private String status;
     @Column("b")
@@ -52,27 +52,34 @@ public class MRServiceTopologyAPIEntity  extends TopologyBaseAPIEntity {
     public String getStatus() {
         return status;
     }
+
     public void setStatus(String status) {
         this.status = status;
         valueChanged("status");
     }
+
     public String getNumConfiguredMapSlots() {
         return numConfiguredMapSlots;
     }
+
     public void setNumConfiguredMapSlots(String numConfiguredMapSlots) {
         this.numConfiguredMapSlots = numConfiguredMapSlots;
         valueChanged("numConfiguredMapSlots");
     }
+
     public String getNumConfiguredReduceSlots() {
         return numConfiguredReduceSlots;
     }
+
     public void setNumConfiguredReduceSlots(String numConfiguredReduceSlots) {
         this.numConfiguredReduceSlots = numConfiguredReduceSlots;
         valueChanged("numConfiguredReduceSlots");
     }
+
     public String getHealthReport() {
         return healthReport;
     }
+
     public void setHealthReport(String healthReport) {
         this.healthReport = healthReport;
         valueChanged("healthReport");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java
index e4de886..f064edf 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java
@@ -26,5 +26,5 @@ public class TopologyEntityRepository extends EntityRepository {
         entitySet.add(HdfsServiceTopologyAPIEntity.class);
         entitySet.add(MRServiceTopologyAPIEntity.class);
         entitySet.add(JournalNodeServiceAPIEntity.class);
-   }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/pom.xml b/eagle-topology-check/pom.xml
index b26e32b..7d68f73 100644
--- a/eagle-topology-check/pom.xml
+++ b/eagle-topology-check/pom.xml
@@ -51,4 +51,16 @@
         </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <configuration>
+                    <failOnViolation>true</failOnViolation>
+                    <failsOnError>true</failsOnError>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file