You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/03 06:47:37 UTC
incubator-eagle git commit: [EAGLE-690] integrage topology health
check with alert engine
Repository: incubator-eagle
Updated Branches:
refs/heads/master 2bc25969a -> f108c7f45
[EAGLE-690] integrage topology health check with alert engine
integrage topology health check with alert engine
Author: yupu <yu...@ebay.com>
Closes #570 from puyulu/master.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f108c7f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f108c7f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f108c7f4
Branch: refs/heads/master
Commit: f108c7f456a02d92c70479895f61d3ab724745f8
Parents: 2bc2596
Author: yupu <yu...@ebay.com>
Authored: Thu Nov 3 14:47:28 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Nov 3 14:47:28 2016 +0800
----------------------------------------------------------------------
.../apache/eagle/topology/TopologyCheckApp.java | 17 +++
.../eagle/topology/TopologyCheckAppConfig.java | 8 +-
.../topology/extractor/TopologyCrawler.java | 3 +-
.../extractor/TopologyEntityParser.java | 6 +-
.../extractor/TopologyEntityParserResult.java | 2 +-
.../hdfs/HdfsTopologyEntityParser.java | 14 +-
.../topology/extractor/mr/YarnNodeInfo.java | 21 ++-
.../extractor/mr/YarnNodeInfoWrapper.java | 2 +-
.../topology/extractor/mr/YarnNodeInfos.java | 2 +-
.../topology/resolver/TopologyRackResolver.java | 2 +-
.../impl/IPMaskTopologyRackResolver.java | 10 +-
.../topology/storm/HealthCheckParseBolt.java | 73 ++++++++++
.../topology/storm/TopologyDataPersistBolt.java | 72 +++++++++-
.../topology/utils/EntityBuilderHelper.java | 18 +--
.../eagle/topology/utils/JMXQueryHelper.java | 6 +-
.../utils/ServiceNotResponseException.java | 11 +-
.../eagle/topology/utils/StringUtils.java | 45 ++++++
....eagle.topology.TopologyCheckAppProvider.xml | 142 ++++++++++++-------
.../src/main/resources/application.conf | 33 +++--
.../entity/HBaseServiceTopologyAPIEntity.java | 4 +-
.../entity/HdfsServiceTopologyAPIEntity.java | 11 +-
.../entity/HealthCheckParseAPIEntity.java | 67 +++++++++
.../entity/JournalNodeServiceAPIEntity.java | 4 +-
.../entity/MRServiceTopologyAPIEntity.java | 11 +-
.../entity/TopologyEntityRepository.java | 2 +-
eagle-topology-check/pom.xml | 12 ++
26 files changed, 474 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
index 95bcb4d..93a06f8 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
@@ -23,16 +23,25 @@ import backtype.storm.topology.TopologyBuilder;
import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.topology.storm.HealthCheckParseBolt;
import org.apache.eagle.topology.storm.TopologyCheckAppSpout;
import org.apache.eagle.topology.storm.TopologyDataPersistBolt;
public class TopologyCheckApp extends StormApplication {
+
+ private static final String SINK_TASK_NUM = "topology.numOfSinkTasks";
+ private static final String TOPOLOGY_HEALTH_CHECK_STREAM = "topology_health_check_stream";
+
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config);
String spoutName = TopologyCheckAppConfig.TOPOLOGY_DATA_FETCH_SPOUT_NAME;
String persistBoltName = TopologyCheckAppConfig.TOPOLOGY_ENTITY_PERSIST_BOLT_NAME;
+ String parseBoltName = TopologyCheckAppConfig.PARSE_BOLT_NAME;
+ String kafkaSinkBoltName = TopologyCheckAppConfig.SINK_BOLT_NAME;
+ int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout(
@@ -47,6 +56,14 @@ public class TopologyCheckApp extends StormApplication {
topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt
).setNumTasks(topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(spoutName);
+ topologyBuilder.setBolt(
+ parseBoltName,
+ new HealthCheckParseBolt(),
+ topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(persistBoltName);
+
+ StormStreamSink<?> sinkBolt = environment.getStreamSink(TOPOLOGY_HEALTH_CHECK_STREAM, config);
+ topologyBuilder.setBolt(kafkaSinkBoltName, sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks).shuffleGrouping(parseBoltName);
+
return topologyBuilder.createTopology();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index 409a87f..0b7cb3d 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -32,6 +32,8 @@ public class TopologyCheckAppConfig implements Serializable {
public static final String TOPOLOGY_DATA_FETCH_SPOUT_NAME = "topologyDataFetcherSpout";
public static final String TOPOLOGY_ENTITY_PERSIST_BOLT_NAME = "topologyEntityPersistBolt";
+ public static final String PARSE_BOLT_NAME = "parserBolt";
+ public static final String SINK_BOLT_NAME = "sinkBolt";
private static final int MAX_NUM_THREADS = 10;
private static final String HBASE_ZOOKEEPER_CLIENT_PORT = "2181";
@@ -98,7 +100,7 @@ public class TopologyCheckAppConfig implements Serializable {
if (config.hasPath("dataSourceConfig.mr")) {
topologyTypes.add(TopologyConstants.TopologyType.MR);
mrConfig = new MRConfig();
- mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*");
+ mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*");
mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null);
}
@@ -129,12 +131,12 @@ public class TopologyCheckAppConfig implements Serializable {
}
public static class MRConfig implements Serializable {
- public String [] rmUrls;
+ public String[] rmUrls;
public String historyServerUrl;
}
public static class HdfsConfig implements Serializable {
- public String [] namenodeUrls;
+ public String[] namenodeUrls;
}
private String getOptionalConfig(String key, String defaultValue) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java
index 55eba15..2451db6 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java
@@ -21,7 +21,8 @@ package org.apache.eagle.topology.extractor;
public interface TopologyCrawler {
/**
- * Fetch raw data and emit the parsed result to the next bolt
+ * Fetch raw data and emit the parsed result to the next bolt.
+ *
* @return
*/
void extract();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
index 6a860ad..64c9bd4 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
@@ -24,19 +24,19 @@ import java.io.IOException;
public interface TopologyEntityParser {
/**
- * Parse hadoop topology and return the topology entity results
+ * Parse hadoop topology and return the topology entity results.
* @return the topology entity result
*/
public TopologyEntityParserResult parse(long timestamp) throws IOException;
/**
- * Get topology type for the parser
+ * Get topology type for the parser.
* @return topology type
*/
public TopologyConstants.TopologyType getTopologyType();
/**
- * Get hadoop version for the parser
+ * Get hadoop version for the parser.
* @return
*/
public TopologyConstants.HadoopVersion getHadoopVersion();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
index f5746f7..1799054 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
@@ -21,7 +21,6 @@ package org.apache.eagle.topology.extractor;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.topology.TopologyConstants;
import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
-
import java.util.ArrayList;
import java.util.List;
@@ -58,6 +57,7 @@ public class TopologyEntityParserResult {
public TopologyConstants.HadoopVersion getVersion() {
return version;
}
+
public void setVersion(TopologyConstants.HadoopVersion version) {
this.version = version;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
index 1cec474..c35ed18 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
@@ -47,7 +47,7 @@ import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
public class HdfsTopologyEntityParser implements TopologyEntityParser {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class);
- private String [] namenodeUrls;
+ private String[] namenodeUrls;
private String site;
private TopologyRackResolver rackResolver;
@@ -78,7 +78,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)";
private static final String QJM_PATTERN = "([\\d\\.]+):\\d+";
-
+
private static final double TB = 1024 * 1024 * 1024 * 1024;
public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) {
@@ -118,9 +118,9 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
if (bean == null || bean.getPropertyMap() == null) {
throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!");
}
- final String hostname = (String)bean.getPropertyMap().get(HA_NAME);
+ final String hostname = (String) bean.getPropertyMap().get(HA_NAME);
HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
- final String state = (String)bean.getPropertyMap().get(HA_STATE);
+ final String state = (String) bean.getPropertyMap().get(HA_STATE);
result.setStatus(state);
final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024));
@@ -155,9 +155,9 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
JSONObject jsonMap = (JSONObject) jsonArray.get(0);
Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>();
- String QJM = jsonMap.getString("manager");
+ String manager = jsonMap.getString("manager");
Pattern qjm = Pattern.compile(QJM_PATTERN);
- Matcher jpmMatcher = qjm.matcher(QJM);
+ Matcher jpmMatcher = qjm.matcher(manager);
while (jpmMatcher.find()) {
String ip = jpmMatcher.group(1);
String hostname = EntityBuilderHelper.resolveHostByIp(ip);
@@ -196,7 +196,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
int numDeadDecommNodes = 0;
String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES);
- JSONTokener tokener = new JSONTokener(deadNodesStrings);
+ JSONTokener tokener = new JSONTokener(deadNodesStrings);
JSONObject jsonNodesObject = new JSONObject(tokener);
final JSONArray deadNodes = jsonNodesObject.names();
for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
index 9315f78..4ca2b06 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
@@ -21,7 +21,7 @@ package org.apache.eagle.topology.extractor.mr;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class YarnNodeInfo {
@@ -40,60 +40,79 @@ public class YarnNodeInfo {
public String getRack() {
return rack;
}
+
public void setRack(String rack) {
this.rack = rack;
}
+
public String getState() {
return state;
}
+
public void setState(String state) {
this.state = state;
}
+
public String getId() {
return id;
}
+
public void setId(String id) {
this.id = id;
}
+
public String getNodeHostName() {
return nodeHostName;
}
+
public void setNodeHostName(String nodeHostName) {
this.nodeHostName = nodeHostName;
}
+
public String getNodeHTTPAddress() {
return nodeHTTPAddress;
}
+
public void setNodeHTTPAddress(String nodeHTTPAddress) {
this.nodeHTTPAddress = nodeHTTPAddress;
}
+
public String getLastHealthUpdate() {
return lastHealthUpdate;
}
+
public void setLastHealthUpdate(String lastHealthUpdate) {
this.lastHealthUpdate = lastHealthUpdate;
}
+
public String getHealthReport() {
return healthReport;
}
+
public void setHealthReport(String healthReport) {
this.healthReport = healthReport;
}
+
public String getNumContainers() {
return numContainers;
}
+
public void setNumContainers(String numContainers) {
this.numContainers = numContainers;
}
+
public String getUsedMemoryMB() {
return usedMemoryMB;
}
+
public void setUsedMemoryMB(String usedMemoryMB) {
this.usedMemoryMB = usedMemoryMB;
}
+
public String getAvailMemoryMB() {
return availMemoryMB;
}
+
public void setAvailMemoryMB(String availMemoryMB) {
this.availMemoryMB = availMemoryMB;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
index 83d8d7f..8079e49 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
@@ -21,7 +21,7 @@ package org.apache.eagle.topology.extractor.mr;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class YarnNodeInfoWrapper {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
index d715a1e..536edd8 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
@@ -24,7 +24,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.util.List;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class YarnNodeInfos {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
index ab7b3cc..c4b4976 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
@@ -21,7 +21,7 @@ package org.apache.eagle.topology.resolver;
public interface TopologyRackResolver {
/**
- *resolve rack by hostname
+ *resolve rack by hostname.
* @return rack name
*/
String resolve(String hostname);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
index df0f863..821de3c 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
@@ -24,19 +24,19 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
/**
- * resolve rack by hostname
+ * resolve rack by hostname.
*/
public class IPMaskTopologyRackResolver implements TopologyRackResolver {
- private final int DEFAULT_RACK_POS = 2;
+ private final int pos = 2;
private int rackPos;
public IPMaskTopologyRackResolver() {
- this.rackPos = DEFAULT_RACK_POS;
+ this.rackPos = pos;
}
public IPMaskTopologyRackResolver(int rackPos) {
- this.rackPos = (rackPos > 3 || rackPos < 0) ? DEFAULT_RACK_POS : rackPos;
+ this.rackPos = (rackPos > 3 || rackPos < 0) ? pos : rackPos;
}
@Override
@@ -44,7 +44,7 @@ public class IPMaskTopologyRackResolver implements TopologyRackResolver {
String result = null;
try {
InetAddress address = InetAddress.getByName(hostname);
- result = "rack" + (int)(address.getAddress()[rackPos] & 0xff);
+ result = "rack" + (int) (address.getAddress()[rackPos] & 0xff);
} catch (UnknownHostException e) {
//e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java
new file mode 100644
index 0000000..a0b282b
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eagle.topology.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class HealthCheckParseBolt extends BaseRichBolt {
+
+ private static Logger LOG = LoggerFactory.getLogger(HealthCheckParseBolt.class);
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+ this.collector = outputCollector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ HealthCheckParseAPIEntity result = null;
+ try {
+ result = (HealthCheckParseAPIEntity) tuple.getValueByField("f1");
+ Map<String, Object> map = new TreeMap<>();
+ map.put("status", result.getStatus());
+ map.put("timestamp", result.getTimeStamp());
+ map.put("role", result.getRole());
+ map.put("host", result.getHost());
+ map.put("site", result.getSite());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("emitted " + map);
+ }
+ collector.emit(Arrays.asList(result.getHost(), map));
+ } catch (Exception ex) {
+ LOG.error("Failing parse security log message, and ignore this message", ex);
+ } finally {
+ collector.ack(tuple);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
index 9b0eb82..e0ff3cd 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -18,11 +18,20 @@
package org.apache.eagle.topology.storm;
+import static org.apache.eagle.topology.TopologyConstants.HOSTNAME_TAG;
+import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
+import static org.apache.eagle.topology.TopologyConstants.ROLE_TAG;
+import static org.apache.eagle.topology.TopologyConstants.SITE_TAG;
+
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.EagleServiceClientException;
@@ -32,6 +41,10 @@ import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.apache.eagle.topology.TopologyCheckAppConfig;
import org.apache.eagle.topology.TopologyConstants;
import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +66,8 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig()));
+ this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig().getString("service.host"), this.config.getConfig().getInt("service.port"),
+ this.config.getConfig().getString("service.username"), this.config.getConfig().getString("service.password")));
this.collector = collector;
}
@@ -84,9 +98,10 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
deleteEntities(entitiesForDeletion, serviceName);
writeEntities(entitiesToWrite, serviceName);
writeEntities(result.getMetrics(), serviceName);
+ emitToKafkaBolt(result);
this.collector.ack(input);
} catch (Exception e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
this.collector.fail(input);
}
}
@@ -100,7 +115,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
+ declarer.declare(new Fields("f1"));
}
private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) {
@@ -112,9 +127,9 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName);
}
} catch (EagleServiceClientException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
} catch (IOException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
entities.clear();
}
@@ -135,8 +150,51 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
private String generateKey(TopologyBaseAPIEntity entity) {
return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG),
- entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
- entity.getTags().get(TopologyConstants.ROLE_TAG));
+ entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
+ entity.getTags().get(TopologyConstants.ROLE_TAG));
}
+ private void emitToKafkaBolt(TopologyEntityParserResult result) {
+
+ List<HealthCheckParseAPIEntity> healthCheckParseAPIList = new ArrayList<HealthCheckParseAPIEntity>();
+
+ setNodeInfo(result.getMasterNodes(), healthCheckParseAPIList);
+
+ setNodeInfo(result.getSlaveNodes(), healthCheckParseAPIList);
+
+ for (HealthCheckParseAPIEntity healthCheckAPIEntity : healthCheckParseAPIList) {
+ this.collector.emit(new Values(healthCheckAPIEntity));
+ }
+
+ }
+
+ private void setNodeInfo(List<TopologyBaseAPIEntity> topologyBaseAPIList, List<HealthCheckParseAPIEntity> healthCheckParseAPIList) {
+ HealthCheckParseAPIEntity healthCheckAPIEntity = null;
+ for (Iterator<TopologyBaseAPIEntity> iterator = topologyBaseAPIList.iterator(); iterator.hasNext(); ) {
+
+ healthCheckAPIEntity = new HealthCheckParseAPIEntity();
+ TopologyBaseAPIEntity topologyBaseAPIEntity = iterator.next();
+
+ if (topologyBaseAPIEntity instanceof HBaseServiceTopologyAPIEntity) {
+
+ healthCheckAPIEntity.setStatus(((HBaseServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+
+ }
+ if (topologyBaseAPIEntity instanceof HdfsServiceTopologyAPIEntity) {
+
+ healthCheckAPIEntity.setStatus(((HdfsServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+ }
+
+ if (topologyBaseAPIEntity instanceof MRServiceTopologyAPIEntity) {
+
+ healthCheckAPIEntity.setStatus(((MRServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+ }
+
+ healthCheckAPIEntity.setTimeStamp(topologyBaseAPIEntity.getTimestamp());
+ healthCheckAPIEntity.setHost(topologyBaseAPIEntity.getTags().get(HOSTNAME_TAG));
+ healthCheckAPIEntity.setRole(topologyBaseAPIEntity.getTags().get(ROLE_TAG));
+ healthCheckAPIEntity.setSite(topologyBaseAPIEntity.getTags().get(SITE_TAG));
+ healthCheckParseAPIList.add(healthCheckAPIEntity);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
index 9bad73b..f390fa8 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
@@ -21,16 +21,12 @@ package org.apache.eagle.topology.utils;
import org.apache.commons.lang.StringUtils;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
-import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.eagle.topology.TopologyConstants.*;
-
public class EntityBuilderHelper {
public static String resolveHostByIp(String ip) {
@@ -48,7 +44,7 @@ public class EntityBuilderHelper {
metricEntity.setTimestamp(timestamp);
metricEntity.setTags(tags);
metricEntity.setPrefix(metricName);
- metricEntity.setValue(new double[]{value});
+ metricEntity.setValue(new double[] {value});
return metricEntity;
}
@@ -59,12 +55,12 @@ public class EntityBuilderHelper {
String metricName = String.format(TopologyConstants.METRIC_LIVE_RATIO_NAME_FORMAT, role);
return EntityBuilderHelper.metricWrapper(timestamp, metricName, value, tags);
}
-
- public static String getValidHostName(String key){
- if(StringUtils.isBlank(key)){
- throw new IllegalArgumentException("key can not be empty");
- }
- return key.indexOf(TopologyConstants.COLON) > 0 ? key.substring(0,key.indexOf(TopologyConstants.COLON)) : key;
+
+ public static String getValidHostName(String key) {
+ if (StringUtils.isBlank(key)) {
+ throw new IllegalArgumentException("key can not be empty");
+ }
+ return key.indexOf(TopologyConstants.COLON) > 0 ? key.substring(0, key.indexOf(TopologyConstants.COLON)) : key;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
index fa4c71f..bca8485 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
@@ -34,7 +34,7 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Helper class to query Hadoop JMX servlets
+ * Helper class to query Hadoop JMX servlets.
*/
public final class JMXQueryHelper {
@@ -66,13 +66,13 @@ public final class JMXQueryHelper {
final JSONArray jsonArray = jsonBeansObject.getJSONArray("beans");
int size = jsonArray.length();
for (int i = 0; i < size; ++i) {
- final JSONObject obj = (JSONObject)jsonArray.get(i);
+ final JSONObject obj = (JSONObject) jsonArray.get(i);
final JMXBean bean = new JMXBean();
final Map<String, Object> map = new HashMap<String, Object>();
bean.setPropertyMap(map);
final JSONArray names = obj.names();
int jsonSize = names.length();
- for (int j = 0 ; j < jsonSize; ++j) {
+ for (int j = 0; j < jsonSize; ++j) {
final String key = names.getString(j);
Object value = obj.get(key);
map.put(key, value);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
index 48c9133..3204ca3 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
@@ -25,14 +25,14 @@ public class ServiceNotResponseException extends IOException {
private static final long serialVersionUID = -2425311876734366496L;
/**
- * Default constructor of FeederException
+ * Default constructor of FeederException.
*/
public ServiceNotResponseException() {
super();
}
/**
- * Constructor of FeederException
+ * Constructor of FeederException.
*
* @param message error message
*/
@@ -41,18 +41,17 @@ public class ServiceNotResponseException extends IOException {
}
/**
- * Constructor of FeederException
+ * Constructor of FeederException.
*
* @param message error message
- * @param cause the cause of the exception
- *
+ * @param cause the cause of the exception
*/
public ServiceNotResponseException(String message, Throwable cause) {
super(message, cause);
}
/**
- * Constructor of FeederException
+ * Constructor of FeederException.
*
* @param cause the cause of the exception
*/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java
new file mode 100644
index 0000000..3b47f84
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public final class StringUtils {
+
+ public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ private StringUtils() {
+
+ }
+
+ public static String convertMapToString(Map<String, String> tags) {
+ StringBuilder tagBuilder = new StringBuilder();
+ Iterator<Entry<String, String>> iter = tags.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, String> entry = (Map.Entry<String, String>) iter.next();
+ tagBuilder.append(entry.getKey() + ":" + entry.getValue());
+ if (iter.hasNext()) {
+ tagBuilder.append(",");
+ }
+ }
+ return tagBuilder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index ed31a8f..8dbf79e 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -22,128 +22,166 @@
<name>Topology Health Check</name>
<version>0.5.0-incubating</version>
<configuration>
- <!-- org.apache.eagle.topology.TopologyCheckApp -->
+ <!-- org.apache.eagle.topology.TopologyCheckApp -->
<property>
<name>dataExtractorConfig.site</name>
<displayName>site</displayName>
<description>Site</description>
<value>sandbox</value>
- </property>
+ </property>
<property>
<name>dataExtractorConfig.fetchDataIntervalInSecs</name>
- <displayName>FetchDataIntervalInSecs</displayName>
+ <displayName>Fetch Data Interval in Secs</displayName>
<description>Fetch Data Interval in Secs</description>
<value>300</value>
</property>
<property>
<name>dataExtractorConfig.parseThreadPoolSize</name>
- <displayName>parseThreadPoolSize</displayName>
+ <displayName>Parser Thread Pool Size</displayName>
<description>Parser Thread Pool Size</description>
<value>5</value>
</property>
<property>
<name>dataExtractorConfig.numDataFetcherSpout</name>
- <displayName>numDataFetcherSpout</displayName>
+ <displayName>Spout Task Number</displayName>
<description>Spout Task Number</description>
<value>1</value>
</property>
<property>
<name>dataExtractorConfig.numEntityPersistBolt</name>
- <displayName>numEntityPersistBolt</displayName>
+ <displayName>Bolt Task Number</displayName>
<description>Bolt Task Number</description>
<value>1</value>
- </property>
+ </property>
+
+ <property>
+ <name>dataExtractorConfig.rackResolverCls</name>
+ <displayName>Rack Resolver Class</displayName>
+ <description>rack resolver class</description>
+ <value>org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver</value>
+ </property>
+
<property>
<name>dataSourceConfig.hbase.zkQuorum</name>
- <displayName>zkQuorum</displayName>
- <description>Zookeeper Quorum</description>
+ <displayName>HBase Zookeeper Quorum</displayName>
+ <description>HBase Zookeeper Quorum</description>
<value>sandbox.hortonworks.com:2181</value>
+ <required>true</required>
</property>
<property>
<name>dataSourceConfig.hbase.zkZnodeParent</name>
- <displayName>zkZnodeParent</displayName>
+ <displayName>Hbase Zookeeper Znode Parent Root</displayName>
<description>Hbase Zookeeper Znode Parent Root</description>
<value>/hbase-unsecure</value>
+ <required>true</required>
</property>
<property>
<name>dataSourceConfig.hbase.zkPropertyClientPort</name>
- <displayName>zkPropertyClientPort</displayName>
+ <displayName>Hbase Zookeeper Client Port</displayName>
<description>Hbase Zookeeper Client Port</description>
<value>2181</value>
+ <required>true</required>
</property>
<property>
<name>dataSourceConfig.hbase.kerberos.master.principal</name>
- <displayName>hbaseMasterPrincipal</displayName>
+ <displayName>Hbase Master Principal</displayName>
<description>Hbase Master Principal</description>
<value>hadoop/_HOST@EXAMPLE.COM</value>
</property>
- <property>
- <name>dataSourceConfig.hbase.kerberos.eagle.keytab</name>
- <displayName>eagleKeytab</displayName>
- <description>Eagle keytab</description>
- <value></value>
- </property>
+
<property>
<name>dataSourceConfig.hbase.kerberos.master.principal</name>
- <displayName>hbaseMasterPrincipal</displayName>
+ <displayName>Hbase Master Principal</displayName>
<description>Hbase Master Principal</description>
<value>hadoop/_HOST@EXAMPLE.COM</value>
- </property>
-
+ </property>
+
<property>
<name>dataSourceConfig.hdfs.namenodeUrl</name>
- <displayName>hdfsNamenodeUrl</displayName>
+ <displayName>Hdfs Namenode Web URL</displayName>
<description>Hdfs Namenode Web URL</description>
<value>http://sandbox.hortonworks.com:50070</value>
+ <required>true</required>
</property>
-
<property>
<name>dataSourceConfig.mr.rmUrl</name>
- <displayName>resourceManagerUrl</displayName>
+ <displayName>Resource Manager URL</displayName>
<description>Resource Manager URL</description>
<value>http://sandbox.hortonworks.com:8088</value>
+ <required>true</required>
</property>
<property>
<name>dataSourceConfig.mr.historyServerUrl</name>
- <displayName>historyServerUrl</displayName>
- <description>History Server URL</description>
- <value></value>
- </property>
-
- <property>
- <name>eagleProps.eagleService.host</name>
- <description>eagleProps.eagleService.host</description>
- <value>localhost</value>
- </property>
- <property>
- <name>eagleProps.eagleService.port</name>
- <description>eagleProps.eagleService.port</description>
- <value>9090</value>
+ <displayName>History Server URL</displayName>
+ <description>History Server URL</description>
+ <value></value>
+ </property>
+
+ <property>
+ <name>topology.numOfSinkTasks</name>
+ <displayName>topology.numOfSinkTasks</displayName>
+ <value>2</value>
+ <description>number of sink tasks</description>
</property>
+
+ <!-- data sink configurations -->
<property>
- <name>eagleProps.eagleService.username</name>
- <description>eagleProps.eagleService.username</description>
- <value>admin</value>
+ <name>dataSinkConfig.topic</name>
+ <displayName>Topic For Kafka Data Sink</displayName>
+ <value>topology_health_check</value>
+ <description>Topic For Kafka Data Sink</description>
</property>
<property>
- <name>eagleProps.eagleService.password</name>
- <description>eagleProps.eagleService.password</description>
- <value>secret</value>
+ <name>dataSinkConfig.brokerList</name>
+ <displayName>Kafka Broker List</displayName>
+ <value>server.eagle.apache.org:9092</value>
+ <description>Kafka Broker List</description>
</property>
<property>
- <name>eagleProps.eagleService.basePath</name>
- <description>eagleProps.eagleService.basePath</description>
- <value>/rest</value>
+ <name>dataSinkConfig.serializerClass</name>
+ <displayName>Serializer Class Kafka Message Value</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>Serializer Class Kafka Message Value</description>
</property>
<property>
- <name>eagleProps.eagleService.readTimeOutSeconds</name>
- <displayName>eagleProps.eagleService.readTimeOutSeconds</displayName>
- <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description>
- <value>2</value>
+ <name>dataSinkConfig.keySerializerClass</name>
+ <displayName>Serializer Class Kafka Message Key</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>Serializer Class Kafka Message Key</description>
</property>
-
+
</configuration>
+ <streams>
+ <stream>
+ <streamId>topology_health_check_stream</streamId>
+ <description>topology health check Stream</description>
+ <validate>true</validate>
+ <timeseries>true</timeseries>
+ <columns>
+ <column>
+ <name>status</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>timestamp</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>host</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>site</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>role</name>
+ <type>string</type>
+ </column>
+ </columns>
+ </stream>
+ </streams>
<docs>
<install>
</install>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
index cbc7ac1..f069df5 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
@@ -18,7 +18,11 @@
mode : "LOCAL",
workers : 1,
- dataExtractorConfig : {
+ topology : {
+ "numOfSinkTasks" : 2
+ }
+
+ dataExtractorConfig : {
"site": "sandbox",
"fetchDataIntervalInSecs": 300,
"parseThreadPoolSize": 5,
@@ -41,20 +45,23 @@
}
},
mr: {
- rmUrl: "http://sandbox.hortonworks.com:8088",
+ rmUrl: "http://sandbox.hortonworks.com:50030",
historyServerUrl : "http://sandbox.hortonworks.com:19888" #if not need, then empty
}
}
-
- eagleProps : {
- "mailHost" : "abc.com",
- "mailDebug" : "true",
- eagleService.host:"localhost",
- eagleService.port: 9090,
- eagleService.username: "admin",
- eagleService.password : "secret",
- eagleService.basePath : "/rest",
- eagleService.readTimeOutSeconds : 20,
- eagleService.maxFlushNum : 500
+
+ "dataSinkConfig": {
+ "topic" : "topology_health_check",
+ "brokerList" : "sandbox.hortonworks.com:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
+ }
+
+ "service": {
+ "host": "localhost",
+ "port": 9090,
+ "username": "admin",
+ "password": "secret",
+ "readTimeOutSeconds" : 10,
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java
index 24761dc..d91ff5c 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java
@@ -22,13 +22,13 @@ import org.apache.eagle.log.entity.meta.*;
import org.apache.eagle.topology.TopologyConstants;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include= JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("hadoop_topology")
@ColumnFamily("f")
@Prefix("hbaseservicestatus")
@Service(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME)
@TimeSeries(false)
-public class HBaseServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
+public class HBaseServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
@Column("a")
private String status;
@Column("b")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java
index 0ef6d0a..baca97b 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java
@@ -22,7 +22,7 @@ import org.apache.eagle.log.entity.meta.*;
import org.apache.eagle.topology.TopologyConstants;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("hadoop_topology")
@ColumnFamily("f")
@Prefix("hdfsservicestatus")
@@ -56,34 +56,43 @@ public class HdfsServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
public String getNumFailedVolumes() {
return numFailedVolumes;
}
+
public void setNumFailedVolumes(String numFailedVolumes) {
this.numFailedVolumes = numFailedVolumes;
valueChanged("numFailedVolumes");
}
+
public String getNumBlocks() {
return numBlocks;
}
+
public void setNumBlocks(String numBlocks) {
this.numBlocks = numBlocks;
valueChanged("numBlocks");
}
+
public String getStatus() {
return status;
}
+
public void setStatus(String status) {
this.status = status;
valueChanged("status");
}
+
public String getConfiguredCapacityTB() {
return configuredCapacityTB;
}
+
public void setConfiguredCapacityTB(String configuredCapacityTB) {
this.configuredCapacityTB = configuredCapacityTB;
valueChanged("configuredCapacityTB");
}
+
public String getUsedCapacityTB() {
return usedCapacityTB;
}
+
public void setUsedCapacityTB(String usedCapacityTB) {
this.usedCapacityTB = usedCapacityTB;
valueChanged("usedCapacityTB");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java
new file mode 100644
index 0000000..d78fea6
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eagle.topology.entity;
+
+public class HealthCheckParseAPIEntity {
+
+ private String status;
+ private long timeStamp;
+ private String host;
+ private String role;
+ private String site;
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getRole() {
+ return role;
+ }
+
+ public void setRole(String role) {
+ this.role = role;
+ }
+
+ public String getSite() {
+ return site;
+ }
+
+ public void setSite(String site) {
+ this.site = site;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java
index f21ad7a..392b107 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java
@@ -22,13 +22,13 @@ import org.apache.eagle.log.entity.meta.*;
import org.apache.eagle.topology.TopologyConstants;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("hadoop_topology")
@ColumnFamily("f")
@Prefix("journalnodestatus")
@Service(TopologyConstants.JN_INSTANCE_SERVICE_NAME)
@TimeSeries(false)
-@Tags({TopologyConstants.SITE_TAG, TopologyConstants.HOSTNAME_TAG, TopologyConstants.RACK_TAG, TopologyConstants.ROLE_TAG})
+@Tags( {TopologyConstants.SITE_TAG, TopologyConstants.HOSTNAME_TAG, TopologyConstants.RACK_TAG, TopologyConstants.ROLE_TAG})
public class JournalNodeServiceAPIEntity extends TopologyBaseAPIEntity {
@Column("a")
private long writtenTxidDiff;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
index 75dfbfb..82e57fd 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
@@ -22,13 +22,13 @@ import org.apache.eagle.log.entity.meta.*;
import org.apache.eagle.topology.TopologyConstants;
import org.codehaus.jackson.map.annotate.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("hadoop_topology")
@ColumnFamily("f")
@Prefix("mrservicestatus")
@Service(TopologyConstants.MR_INSTANCE_SERVICE_NAME)
@TimeSeries(false)
-public class MRServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
+public class MRServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
@Column("a")
private String status;
@Column("b")
@@ -52,27 +52,34 @@ public class MRServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
public String getStatus() {
return status;
}
+
public void setStatus(String status) {
this.status = status;
valueChanged("status");
}
+
public String getNumConfiguredMapSlots() {
return numConfiguredMapSlots;
}
+
public void setNumConfiguredMapSlots(String numConfiguredMapSlots) {
this.numConfiguredMapSlots = numConfiguredMapSlots;
valueChanged("numConfiguredMapSlots");
}
+
public String getNumConfiguredReduceSlots() {
return numConfiguredReduceSlots;
}
+
public void setNumConfiguredReduceSlots(String numConfiguredReduceSlots) {
this.numConfiguredReduceSlots = numConfiguredReduceSlots;
valueChanged("numConfiguredReduceSlots");
}
+
public String getHealthReport() {
return healthReport;
}
+
public void setHealthReport(String healthReport) {
this.healthReport = healthReport;
valueChanged("healthReport");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java
index e4de886..f064edf 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java
@@ -26,5 +26,5 @@ public class TopologyEntityRepository extends EntityRepository {
entitySet.add(HdfsServiceTopologyAPIEntity.class);
entitySet.add(MRServiceTopologyAPIEntity.class);
entitySet.add(JournalNodeServiceAPIEntity.class);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/pom.xml b/eagle-topology-check/pom.xml
index b26e32b..7d68f73 100644
--- a/eagle-topology-check/pom.xml
+++ b/eagle-topology-check/pom.xml
@@ -51,4 +51,16 @@
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <failOnViolation>true</failOnViolation>
+ <failsOnError>true</failsOnError>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file