You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/10/10 03:26:02 UTC
[2/3] incubator-eagle git commit: [EAGLE-545] hdfs/bhase/yarn
topology health check
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
new file mode 100644
index 0000000..f2cad37
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.hdfs;
+
+import org.apache.eagle.app.utils.PathResolverHelper;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.*;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
+
+public class HdfsTopologyEntityParser implements TopologyEntityParser {
+
+ private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class);
+ private String [] namenodeUrls;
+ private String site;
+ private TopologyRackResolver rackResolver;
+
+ private static final String JMX_URL = "/jmx?anonymous=true";
+ private static final String JMX_FS_NAME_SYSTEM_BEAN_NAME = "Hadoop:service=NameNode,name=FSNamesystem";
+ private static final String JMX_NAMENODE_INFO = "Hadoop:service=NameNode,name=NameNodeInfo";
+
+ private static final String HA_STATE = "tag.HAState";
+ private static final String HA_NAME = "tag.Hostname";
+ private static final String CAPACITY_TOTAL_GB = "CapacityTotalGB";
+ private static final String CAPACITY_USED_GB = "CapacityUsedGB";
+ private static final String BLOCKS_TOTAL = "BlocksTotal";
+ private static final String LIVE_NODES = "LiveNodes";
+ private static final String DEAD_NODES = "DeadNodes";
+
+ private static final String JN_STATUS = "NameJournalStatus";
+ private static final String JN_TRANSACTION_INFO = "JournalTransactionInfo";
+ private static final String LAST_TX_ID = "LastAppliedOrWrittenTxId";
+
+ private static final String DATA_NODE_NUM_BLOCKS = "numBlocks";
+ private static final String DATA_NODE_USED_SPACE = "usedSpace";
+ private static final String DATA_NODE_CAPACITY = "capacity";
+ private static final String DATA_NODE_ADMIN_STATE = "adminState";
+ private static final String DATA_NODE_FAILED_VOLUMN = "volfails";
+
+ private static final String DATA_NODE_DECOMMISSIONED = "Decommissioned";
+ private static final String DATA_NODE_DECOMMISSIONED_STATE = "decommissioned";
+
+ private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)";
+ private static final String QJM_PATTERN = "([\\d\\.]+):\\d+";
+
+ public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) {
+ this.namenodeUrls = hdfsConfig.namenodeUrls;
+ this.site = site;
+ this.rackResolver = rackResolver;
+ }
+
+ @Override
+ public TopologyEntityParserResult parse(long timestamp) throws IOException {
+ final TopologyEntityParserResult result = new TopologyEntityParserResult();
+ result.setVersion(TopologyConstants.HadoopVersion.V2);
+ int numNamenode = 0;
+ for (String url : namenodeUrls) {
+ try {
+ final HdfsServiceTopologyAPIEntity namenodeEntity = createNamenodeEntity(url, timestamp);
+ result.getMasterNodes().add(namenodeEntity);
+ numNamenode++;
+ if (namenodeEntity.getStatus().equalsIgnoreCase(NAME_NODE_ACTIVE_STATUS)) {
+ createSlaveNodeEntities(url, timestamp, result);
+ }
+ } catch (RuntimeException ex) {
+ ex.printStackTrace();
+ } catch (IOException e) {
+ LOG.warn("Catch an IOException with url: {}", url);
+ }
+ }
+ double value = numNamenode * 1d / namenodeUrls.length;
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NAME_NODE_ROLE, value, site, timestamp));
+ return result;
+ }
+
+ private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws JSONException, IOException {
+ final String urlString = buildFSNamesystemURL(url);
+ final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
+ final JMXBean bean = jmxBeanMap.get(JMX_FS_NAME_SYSTEM_BEAN_NAME);
+ if (bean == null || bean.getPropertyMap() == null) {
+ throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!");
+ }
+ final String hostname = (String)bean.getPropertyMap().get(HA_NAME);
+ HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
+ final String state = (String)bean.getPropertyMap().get(HA_STATE);
+ result.setStatus(state);
+ final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
+ result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024));
+ final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB);
+ result.setUsedCapacityTB(Double.toString(capacityUsedGB / 1024));
+ final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL);
+ result.setNumBlocks(Integer.toString(blocksTotal));
+ return result;
+ }
+
+ private void createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws IOException {
+ final String urlString = buildNamenodeInfo(url);
+ final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
+ final JMXBean bean = jmxBeanMap.get(JMX_NAMENODE_INFO);
+ if (bean == null || bean.getPropertyMap() == null) {
+ throw new ServiceNotResponseException("Invalid JMX format, NameNodeInfo bean is null!");
+ }
+ createAllDataNodeEntities(bean, updateTime, result);
+ createAllJournalNodeEntities(bean, updateTime, result);
+ }
+
+ private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws UnknownHostException {
+ if (bean.getPropertyMap().get(JN_TRANSACTION_INFO) == null || bean.getPropertyMap().get(JN_STATUS) == null) {
+ return;
+ }
+ String jnInfoString = (String) bean.getPropertyMap().get(JN_TRANSACTION_INFO);
+ JSONObject jsonObject = new JSONObject(jnInfoString);
+ long lastTxId = Long.parseLong(jsonObject.getString(LAST_TX_ID));
+
+ String journalnodeString = (String) bean.getPropertyMap().get(JN_STATUS);
+ JSONArray jsonArray = new JSONArray(journalnodeString);
+ JSONObject jsonMap = (JSONObject) jsonArray.get(0);
+
+ Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>();
+ String QJM = jsonMap.getString("manager");
+ Pattern qjm = Pattern.compile(QJM_PATTERN);
+ Matcher jpmMatcher = qjm.matcher(QJM);
+ while (jpmMatcher.find()) {
+ String ip = jpmMatcher.group(1);
+ String hostname = EntityBuilderHelper.resolveHostByIp(ip);
+ HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.JOURNAL_NODE_ROLE, hostname, updateTime);
+ entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
+ journalNodesMap.put(ip, entity);
+ }
+ if (journalNodesMap.isEmpty()) {
+ LOG.warn("Fail to find journal node info in JMX");
+ return;
+ }
+
+ String stream = jsonMap.getString("stream");
+ Pattern status = Pattern.compile(STATUS_PATTERN);
+ Matcher statusMatcher = status.matcher(stream);
+ long numLiveJournalNodes = 0;
+ while (statusMatcher.find()) {
+ numLiveJournalNodes++;
+ String ip = statusMatcher.group(1);
+ if (journalNodesMap.containsKey(ip)) {
+ long txid = Long.parseLong(statusMatcher.group(2));
+ journalNodesMap.get(ip).setWrittenTxidDiff(lastTxId - txid);
+ journalNodesMap.get(ip).setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
+ }
+ }
+ result.getMasterNodes().addAll(journalNodesMap.values());
+
+ double value = numLiveJournalNodes * 1d / journalNodesMap.size();
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.JOURNAL_NODE_ROLE, value, site, updateTime));
+ }
+
+ private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException, IOException {
+ int numLiveNodes = 0;
+ int numLiveDecommNodes = 0;
+ int numDeadNodes = 0;
+ int numDeadDecommNodes = 0;
+
+ String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES);
+ JSONTokener tokener = new JSONTokener(deadNodesStrings);
+ JSONObject jsonNodesObject = new JSONObject(tokener);
+ final JSONArray deadNodes = jsonNodesObject.names();
+ for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) {
+ final String hostname = deadNodes.getString(i);
+ final JSONObject deadNode = jsonNodesObject.getJSONObject(hostname);
+ HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, hostname, updateTime);
+ if (deadNode.getBoolean(DATA_NODE_DECOMMISSIONED_STATE)) {
+ ++numDeadDecommNodes;
+ entity.setStatus(TopologyConstants.DATA_NODE_DEAD_DECOMMISSIONED_STATUS);
+ } else {
+ entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
+ }
+ ++numDeadNodes;
+ result.getSlaveNodes().add(entity);
+ }
+ LOG.info("Dead nodes " + numDeadNodes + ", dead but decommissioned nodes: " + numDeadDecommNodes);
+
+ String liveNodesStrings = (String) bean.getPropertyMap().get(LIVE_NODES);
+ tokener = new JSONTokener(liveNodesStrings);
+ jsonNodesObject = new JSONObject(tokener);
+ final JSONArray liveNodes = jsonNodesObject.names();
+ for (int i = 0; liveNodes != null && i < liveNodes.length(); ++i) {
+ final String hostname = liveNodes.getString(i);
+ final JSONObject liveNode = jsonNodesObject.getJSONObject(hostname);
+
+ HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, hostname, updateTime);
+ final Number configuredCapacity = (Number) liveNode.get(DATA_NODE_CAPACITY);
+ entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / 1024.0 / 1024.0 / 1024.0 / 1024.0));
+ final Number capacityUsed = (Number) liveNode.get(DATA_NODE_USED_SPACE);
+ entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / 1024.0 / 1024.0 / 1024.0 / 1024.0));
+ final Number blocksTotal = (Number) liveNode.get(DATA_NODE_NUM_BLOCKS);
+ entity.setNumBlocks(Double.toString(blocksTotal.doubleValue()));
+ if (liveNode.has(DATA_NODE_FAILED_VOLUMN)) {
+ final Number volFails = (Number) liveNode.get(DATA_NODE_FAILED_VOLUMN);
+ entity.setNumFailedVolumes(Double.toString(volFails.doubleValue()));
+ }
+ final String adminState = liveNode.getString(DATA_NODE_ADMIN_STATE);
+ if (DATA_NODE_DECOMMISSIONED.equalsIgnoreCase(adminState)) {
+ ++numLiveDecommNodes;
+ entity.setStatus(TopologyConstants.DATA_NODE_LIVE_DECOMMISSIONED_STATUS);
+ } else {
+ entity.setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
+ }
+ numLiveNodes++;
+ result.getSlaveNodes().add(entity);
+ }
+ LOG.info("Live nodes " + numLiveNodes + ", live but decommissioned nodes: " + numLiveDecommNodes);
+
+ double value = numLiveNodes * 1.0d / result.getSlaveNodes().size();
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.DATA_NODE_ROLE, value, site, updateTime));
+ }
+
+ private HdfsServiceTopologyAPIEntity createHdfsServiceEntity(String roleType, String hostname, long updateTime) {
+ HdfsServiceTopologyAPIEntity entity = new HdfsServiceTopologyAPIEntity();
+ entity.setTimestamp(updateTime);
+ Map<String, String> tags = new HashMap<String, String>();
+ entity.setTags(tags);
+ tags.put(SITE_TAG, site);
+ tags.put(ROLE_TAG, roleType);
+ tags.put(HOSTNAME_TAG, hostname);
+ String rack = rackResolver.resolve(hostname);
+ tags.put(RACK_TAG, rack);
+ return entity;
+ }
+
+ private String buildFSNamesystemURL(String url) {
+ return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_FS_NAME_SYSTEM_BEAN_NAME);
+ }
+
+ private String buildNamenodeInfo(String url) {
+ return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_NAMENODE_INFO);
+ }
+
+ @Override
+ public TopologyConstants.TopologyType getTopologyType() {
+ return TopologyConstants.TopologyType.HDFS;
+ }
+
+ @Override
+ public TopologyConstants.HadoopVersion getHadoopVersion() {
+ return TopologyConstants.HadoopVersion.V2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
new file mode 100644
index 0000000..af6bd51
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.tuple.Values;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyCheckMessageId;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.extractor.TopologyCrawler;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MRTopologyCrawler implements TopologyCrawler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MRTopologyCrawler.class);
+
+ private MRTopologyEntityParser parser;
+ private SpoutOutputCollector outputCollector;
+
+ public MRTopologyCrawler(TopologyCheckAppConfig config, TopologyRackResolver rackResolver, SpoutOutputCollector collector) {
+ this.parser = new MRTopologyEntityParser(config.dataExtractorConfig.site, config.mrConfig, rackResolver);
+ this.outputCollector = collector;
+ }
+
+ @Override
+ public void extract() {
+ long updateTimestamp = System.currentTimeMillis();
+ TopologyEntityParserResult result = parser.parse(updateTimestamp);
+ if (result == null || result.getMasterNodes().isEmpty()) {
+ LOG.warn("No data fetched");
+ return;
+ }
+ TopologyCheckMessageId messageId = new TopologyCheckMessageId(TopologyConstants.TopologyType.MR, updateTimestamp);
+ this.outputCollector.emit(new Values(TopologyConstants.MR_INSTANCE_SERVICE_NAME, result), messageId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
new file mode 100644
index 0000000..455b1d0
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.apache.eagle.app.utils.AppConstants;
+import org.apache.eagle.app.utils.PathResolverHelper;
+import org.apache.eagle.app.utils.connection.InputStreamUtils;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.EntityBuilderHelper;
+import org.apache.eagle.topology.utils.ServiceNotResponseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class MRTopologyEntityParser implements TopologyEntityParser {
+
+ private String [] rmUrls;
+ private String historyServerUrl;
+ private String site;
+ private TopologyRackResolver rackResolver;
+
+ private static final String YARN_NODES_URL = "/ws/v1/cluster/nodes?anonymous=true";
+ private static final String YARN_HISTORY_SERVER_URL = "/ws/v1/history/info";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MRTopologyEntityParser.class);
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public MRTopologyEntityParser(String site, TopologyCheckAppConfig.MRConfig config, TopologyRackResolver rackResolver) {
+ this.site = site;
+ this.rmUrls = config.rmUrls;
+ this.historyServerUrl = config.historyServerUrl;
+ this.rackResolver = rackResolver;
+ }
+
+ @Override
+ public TopologyConstants.HadoopVersion getHadoopVersion() {
+ return TopologyConstants.HadoopVersion.V2;
+ }
+
+ @Override
+ public TopologyConstants.TopologyType getTopologyType() {
+ return TopologyConstants.TopologyType.MR;
+ }
+
+ @Override
+ public TopologyEntityParserResult parse(long timestamp) {
+ final TopologyEntityParserResult result = new TopologyEntityParserResult();
+ result.setVersion(TopologyConstants.HadoopVersion.V2);
+
+ for (String url : rmUrls) {
+ try {
+ doParse(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL), timestamp, result);
+ } catch (ServiceNotResponseException ex) {
+ LOGGER.warn("Catch a ServiceNotResponseException with url: {}", url);
+ // reSelect url
+ }
+ }
+ if (result.getMasterNodes().isEmpty()) {
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.RESOURCE_MANAGER_ROLE, 0, site, timestamp));
+ }
+ doCheckHistoryServer(timestamp, result);
+ return result;
+ }
+
+ private void doCheckHistoryServer(long updateTime, TopologyEntityParserResult result) {
+ if (historyServerUrl == null || historyServerUrl.isEmpty()) {
+ return;
+ }
+ String hsUrl = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL);
+ double liveCount = 1;
+ try {
+ InputStreamUtils.getInputStream(hsUrl, null, AppConstants.CompressionType.NONE);
+ } catch (ConnectException e) {
+ liveCount = 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, liveCount, site, updateTime));
+ }
+
+ private InputStream getInputStream(String url, AppConstants.CompressionType type) throws ServiceNotResponseException {
+ InputStream is = null;
+ try {
+ is = InputStreamUtils.getInputStream(url, null, type);
+ } catch (ConnectException e) {
+ throw new ServiceNotResponseException(e);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return is;
+ }
+
+ private void doParse(String url, long timestamp, TopologyEntityParserResult result) throws ServiceNotResponseException {
+
+ InputStream is = null;
+ try {
+ LOGGER.info("Going to query URL: " + url);
+ is = InputStreamUtils.getInputStream(url, null, AppConstants.CompressionType.NONE);
+ YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class);
+ if (nodeWrapper.getNodes() == null || nodeWrapper.getNodes().getNode() == null) {
+ throw new ServiceNotResponseException("Invalid result of URL: " + url);
+ }
+ int runningNodeCount = 0;
+ int lostNodeCount = 0;
+ int unhealthyNodeCount = 0;
+ final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode();
+ for (YarnNodeInfo info : list) {
+ final MRServiceTopologyAPIEntity nodeManagerEntity = createEntity(NODE_MANAGER_ROLE, info.getNodeHostName(), timestamp);
+ if (info.getHealthReport() != null && (!info.getHealthReport().isEmpty())) {
+ nodeManagerEntity.setHealthReport(info.getHealthReport());
+ }
+ // TODO: Need to remove the manually mapping RUNNING -> running, LOST - > lost, UNHEALTHY -> unhealthy
+ if (info.getState() != null) {
+ final String state = info.getState().toLowerCase();
+ nodeManagerEntity.setStatus(state);
+ if (state.equals(TopologyConstants.NODE_MANAGER_RUNNING_STATUS)) {
+ ++runningNodeCount;
+ } else if (state.equals(TopologyConstants.NODE_MANAGER_LOST_STATUS)) {
+ ++lostNodeCount;
+ } else if (state.equals(TopologyConstants.NODE_MANAGER_UNHEALTHY_STATUS)) {
+ ++unhealthyNodeCount;
+ }
+ }
+ result.getSlaveNodes().add(nodeManagerEntity);
+ }
+ LOGGER.info("Running NMs: " + runningNodeCount + ", lost NMs: " + lostNodeCount + ", unhealthy NMs: " + unhealthyNodeCount);
+ final MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, extractMasterHost(url), timestamp);
+ resourceManagerEntity.setStatus(TopologyConstants.RESOURCE_MANAGER_ACTIVE_STATUS);
+ result.getMasterNodes().add(resourceManagerEntity);
+ double value = runningNodeCount * 1d / list.size();
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp));
+ } catch (RuntimeException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ throw new ServiceNotResponseException(e);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ // Do nothing
+ }
+ }
+ }
+ }
+
+ private String extractMasterHost(String url) {
+ Matcher matcher = TopologyConstants.HTTP_HOST_MATCH_PATTERN.matcher(url);
+ if (matcher.find()) {
+ return matcher.group(1);
+ }
+ return url;
+ }
+
+ private String extractRack(YarnNodeInfo info) {
+ if (info.getRack() == null) {
+ return null;
+ }
+ String value = info.getRack();
+ value = value.substring(value.lastIndexOf('/') + 1);
+ return value;
+ }
+
+ private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) {
+ MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity();
+ entity.setLastUpdateTime(updateTime);
+ Map<String, String> tags = new HashMap<String, String>();
+ entity.setTags(tags);
+ tags.put(SITE_TAG, site);
+ tags.put(ROLE_TAG, roleType);
+ tags.put(HOSTNAME_TAG, hostname);
+ String rack = rackResolver.resolve(hostname);
+ tags.put(RACK_TAG, rack);
+ return entity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
new file mode 100644
index 0000000..9315f78
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class YarnNodeInfo {
+
+ private String rack;
+ private String state;
+ private String id;
+ private String nodeHostName;
+ private String nodeHTTPAddress;
+ private String lastHealthUpdate;
+ private String healthReport;
+ private String numContainers;
+ private String usedMemoryMB;
+ private String availMemoryMB;
+
+
+ public String getRack() {
+ return rack;
+ }
+ public void setRack(String rack) {
+ this.rack = rack;
+ }
+ public String getState() {
+ return state;
+ }
+ public void setState(String state) {
+ this.state = state;
+ }
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public String getNodeHostName() {
+ return nodeHostName;
+ }
+ public void setNodeHostName(String nodeHostName) {
+ this.nodeHostName = nodeHostName;
+ }
+ public String getNodeHTTPAddress() {
+ return nodeHTTPAddress;
+ }
+ public void setNodeHTTPAddress(String nodeHTTPAddress) {
+ this.nodeHTTPAddress = nodeHTTPAddress;
+ }
+ public String getLastHealthUpdate() {
+ return lastHealthUpdate;
+ }
+ public void setLastHealthUpdate(String lastHealthUpdate) {
+ this.lastHealthUpdate = lastHealthUpdate;
+ }
+ public String getHealthReport() {
+ return healthReport;
+ }
+ public void setHealthReport(String healthReport) {
+ this.healthReport = healthReport;
+ }
+ public String getNumContainers() {
+ return numContainers;
+ }
+ public void setNumContainers(String numContainers) {
+ this.numContainers = numContainers;
+ }
+ public String getUsedMemoryMB() {
+ return usedMemoryMB;
+ }
+ public void setUsedMemoryMB(String usedMemoryMB) {
+ this.usedMemoryMB = usedMemoryMB;
+ }
+ public String getAvailMemoryMB() {
+ return availMemoryMB;
+ }
+ public void setAvailMemoryMB(String availMemoryMB) {
+ this.availMemoryMB = availMemoryMB;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
new file mode 100644
index 0000000..83d8d7f
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class YarnNodeInfoWrapper {
+
+ private YarnNodeInfos infos;
+
+ public YarnNodeInfos getNodes() {
+ return infos;
+ }
+
+ public void setNodes(YarnNodeInfos infos) {
+ this.infos = infos;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
new file mode 100644
index 0000000..d715a1e
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class YarnNodeInfos {
+
+ private List<YarnNodeInfo> node;
+
+ public List<YarnNodeInfo> getNode() {
+ return node;
+ }
+
+ public void setNode(List<YarnNodeInfo> node) {
+ this.node = node;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
new file mode 100644
index 0000000..ab7b3cc
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.resolver;
+
+public interface TopologyRackResolver {
+
+ /**
+ *resolve rack by hostname
+ * @return rack name
+ */
+ String resolve(String hostname);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java
new file mode 100644
index 0000000..8f0aff8
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.resolver.impl;
+
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+
+public class DefaultTopologyRackResolver implements TopologyRackResolver {
+
+ private static final String DEFAULT_RACK_NAME = "default-rack";
+ private String rack;
+
+ public DefaultTopologyRackResolver() {
+ this.rack = DEFAULT_RACK_NAME;
+ }
+
+ public DefaultTopologyRackResolver(String rack) {
+ this.rack = rack;
+ }
+
+ /**
+ * If topology.script.file.name is unset, then the rack name for all hostnames is default-rack
+ * @param hostname
+ * @return rack
+ */
+ @Override
+ public String resolve(String hostname) {
+ return rack;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
new file mode 100644
index 0000000..df0f863
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.resolver.impl;
+
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * resolve rack by hostname
+ */
+public class IPMaskTopologyRackResolver implements TopologyRackResolver {
+
+ private final int DEFAULT_RACK_POS = 2;
+ private int rackPos;
+
+ public IPMaskTopologyRackResolver() {
+ this.rackPos = DEFAULT_RACK_POS;
+ }
+
+ public IPMaskTopologyRackResolver(int rackPos) {
+ this.rackPos = (rackPos > 3 || rackPos < 0) ? DEFAULT_RACK_POS : rackPos;
+ }
+
+ @Override
+ public String resolve(String hostname) {
+ String result = null;
+ try {
+ InetAddress address = InetAddress.getByName(hostname);
+ result = "rack" + (int)(address.getAddress()[rackPos] & 0xff);
+ } catch (UnknownHostException e) {
+ //e.printStackTrace();
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java
new file mode 100644
index 0000000..f2f3d41
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.Map;
+
+public class TopologyCheckAppSpout extends BaseRichSpout {
+
+ private TopologyDataExtractor extractor;
+ private TopologyCheckAppConfig topologyCheckAppConfig;
+
+ private long lastFetchTime;
+ private long fetchInterval;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckAppSpout.class);
+
+ public TopologyCheckAppSpout(TopologyCheckAppConfig topologyCheckAppConfig) {
+ this.topologyCheckAppConfig = topologyCheckAppConfig;
+ this.lastFetchTime = 0;
+ this.fetchInterval = topologyCheckAppConfig.dataExtractorConfig.fetchDataIntervalInSecs * DateTimeUtil.ONESECOND;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(TopologyConstants.SERVICE_NAME_FIELD, TopologyConstants.TOPOLOGY_DATA_FIELD));
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.extractor = new TopologyDataExtractor(topologyCheckAppConfig, collector);
+ }
+
+ @Override
+ public void nextTuple() {
+ long currentTime = System.currentTimeMillis();
+ Calendar calendar = Calendar.getInstance();
+ if (currentTime > lastFetchTime + fetchInterval) {
+ calendar.setTimeInMillis(this.lastFetchTime);
+ LOG.info("Last fetch time = {}", calendar.getTime());
+ this.extractor.crawl();
+ lastFetchTime = currentTime;
+ }
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ LOG.warn("ack {}", msgId.toString());
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ LOG.info("ack {}", msgId.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
new file mode 100644
index 0000000..32eae9b
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.extractor.TopologyCrawler;
+import org.apache.eagle.topology.extractor.TopologyExtractorFactory;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class TopologyDataExtractor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TopologyDataExtractor.class);
+ private static final int MIN_WAIT_TIME_SECS = 60;
+ private static final double FETCH_TIMEOUT_FACTOR = 0.8;
+
+ private TopologyCheckAppConfig config;
+ private List<TopologyCrawler> extractors;
+ private ExecutorService executorService;
+
+ public TopologyDataExtractor(TopologyCheckAppConfig topologyCheckAppConfig, SpoutOutputCollector collector) {
+ this.config = topologyCheckAppConfig;
+ extractors = getExtractors(collector);
+ executorService = Executors.newFixedThreadPool(topologyCheckAppConfig.dataExtractorConfig.parseThreadPoolSize);
+ }
+
+ public void crawl() {
+ List<Future<?>> futures = new ArrayList<>();
+ for (TopologyCrawler topologyExtractor : extractors) {
+ futures.add(executorService.submit(new DataFetchRunnableWrapper(topologyExtractor)));
+ }
+ long fetchTimeoutSecs = (long) Math.max(config.dataExtractorConfig.fetchDataIntervalInSecs * FETCH_TIMEOUT_FACTOR, MIN_WAIT_TIME_SECS);
+ futures.forEach(future -> {
+ try {
+ future.get(fetchTimeoutSecs, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.info("Caught an overtime exception with message" + e.getMessage());
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ } catch (TimeoutException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ private List<TopologyCrawler> getExtractors(SpoutOutputCollector collector) {
+ List<TopologyCrawler> extractors = new ArrayList<>();
+ TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
+ if (config.dataExtractorConfig.resolverCls != null) {
+ try {
+ rackResolver = config.dataExtractorConfig.resolverCls.newInstance();
+ } catch (InstantiationException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+ }
+ for (TopologyType type : config.topologyTypes) {
+ try {
+ extractors.add(TopologyExtractorFactory.create(type, config, rackResolver, collector));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return extractors;
+ }
+
+ private static class DataFetchRunnableWrapper implements Runnable {
+
+ private TopologyCrawler topologyExtractor;
+
+ public DataFetchRunnableWrapper(TopologyCrawler topologyExtractor) {
+ this.topologyExtractor = topologyExtractor;
+ }
+
+ @Override
+ public void run() {
+ topologyExtractor.extract();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
new file mode 100644
index 0000000..490f427
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TopologyDataPersistBolt extends BaseRichBolt {
+
+ private TopologyCheckAppConfig config;
+ private IEagleServiceClient client;
+ private OutputCollector collector;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopologyDataPersistBolt.class);
+
+ public TopologyDataPersistBolt(TopologyCheckAppConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.config));
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (input == null) {
+ return;
+ }
+ String serviceName = input.getStringByField(TopologyConstants.SERVICE_NAME_FIELD);
+ TopologyEntityParserResult result = (TopologyEntityParserResult) input.getValueByField(TopologyConstants.TOPOLOGY_DATA_FIELD);
+ Set<String> availableHostnames = new HashSet<String>();
+ List<TopologyBaseAPIEntity> entitiesForDeletion = new ArrayList<>();
+ List<TopologyBaseAPIEntity> entitiesToWrite = new ArrayList<>();
+
+ filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getMasterNodes());
+ filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getSlaveNodes());
+
+ String query = String.format("%s[@site=\"%s\"]{*}", serviceName, this.config.dataExtractorConfig.site);
+ try {
+ GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send();
+ if (response.isSuccess() && response.getObj() != null) {
+ for (TopologyBaseAPIEntity entity : response.getObj()) {
+ if (!availableHostnames.contains(generateKey(entity))) {
+ entitiesForDeletion.add(entity);
+ }
+ }
+ }
+ deleteEntities(entitiesForDeletion, serviceName);
+ writeEntities(entitiesToWrite, serviceName);
+ writeEntities(result.getMetrics(), serviceName);
+ this.collector.ack(input);
+ } catch (Exception e) {
+ e.printStackTrace();
+ this.collector.fail(input);
+ }
+ }
+
+ private void filterEntitiesToWrite(List<TopologyBaseAPIEntity> entitiesToWrite, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entities) {
+ for (TopologyBaseAPIEntity entity : entities) {
+ availableHostnames.add(generateKey(entity));
+ entitiesToWrite.add(entity);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) {
+ try {
+ GenericServiceAPIResponseEntity response = client.delete(entities);
+ if (!response.isSuccess()) {
+ LOG.error("Got exception from eagle service: " + response.getException());
+ } else {
+ LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName);
+ }
+ } catch (EagleServiceClientException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ entities.clear();
+ }
+
+ private void writeEntities(List<? extends TaggedLogAPIEntity> entities, String serviceName) {
+ try {
+ GenericServiceAPIResponseEntity response = client.create(entities);
+ if (!response.isSuccess()) {
+ LOG.error("Got exception from eagle service: " + response.getException());
+ } else {
+ LOG.info("Successfully wrote {} entities for {}", entities.size(), serviceName);
+ }
+ } catch (Exception e) {
+ LOG.error("cannot create entities successfully", e);
+ }
+ entities.clear();
+ }
+
+ private String generateKey(TopologyBaseAPIEntity entity) {
+ return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG),
+ entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
+ entity.getTags().get(TopologyConstants.ROLE_TAG));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
new file mode 100644
index 0000000..55c6183
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class EntityBuilderHelper {
+
+ public static String resolveHostByIp(String ip) {
+ InetAddress addr = null;
+ try {
+ addr = InetAddress.getByName(ip);
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ return addr.getHostName();
+ }
+
+ public static GenericMetricEntity metricWrapper(Long timestamp, String metricName, double value, Map<String, String> tags) {
+ GenericMetricEntity metricEntity = new GenericMetricEntity();
+ metricEntity.setTimestamp(timestamp);
+ metricEntity.setTags(tags);
+ metricEntity.setPrefix(metricName);
+ metricEntity.setValue(new double[]{value});
+ return metricEntity;
+ }
+
+ public static GenericMetricEntity generateMetric(String role, double value, String site, long timestamp) {
+ Map<String, String> tags = new HashMap<>();
+ tags.put(TopologyConstants.SITE_TAG, site);
+ tags.put(TopologyConstants.ROLE_TAG, role);
+ String metricName = String.format(TopologyConstants.METRIC_LIVE_RATIO_NAME_FORMAT, role);
+ return EntityBuilderHelper.metricWrapper(timestamp, metricName, value, tags);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java
new file mode 100644
index 0000000..d9ac3e0
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import java.util.Map;
+
+public class JMXBean {
+
+ private Map<String, Object> propertyMap;
+
+ public Map<String, Object> getPropertyMap() {
+ return propertyMap;
+ }
+
+ public void setPropertyMap(Map<String, Object> propertyMap) {
+ this.propertyMap = propertyMap;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
new file mode 100644
index 0000000..fa4c71f
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import org.apache.eagle.app.utils.connection.URLConnectionUtils;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URLConnection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Helper class to query Hadoop JMX servlets
+ */
+public final class JMXQueryHelper {
+
+ private static final int DEFAULT_QUERY_TIMEOUT = 30 * 60 * 1000;
+ private static final Logger LOG = LoggerFactory.getLogger(JMXQueryHelper.class);
+
+ public static Map<String, JMXBean> query(String jmxQueryUrl) throws JSONException, IOException {
+ LOG.info("Going to query JMX url: " + jmxQueryUrl);
+ InputStream is = null;
+ try {
+ final URLConnection connection = URLConnectionUtils.getConnection(jmxQueryUrl);
+ connection.setReadTimeout(DEFAULT_QUERY_TIMEOUT);
+ is = connection.getInputStream();
+ return parseStream(is);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ }
+
+ public static Map<String, JMXBean> parseStream(InputStream is) {
+ final Map<String, JMXBean> resultMap = new HashMap<String, JMXBean>();
+ final JSONTokener tokener = new JSONTokener(is);
+ final JSONObject jsonBeansObject = new JSONObject(tokener);
+ final JSONArray jsonArray = jsonBeansObject.getJSONArray("beans");
+ int size = jsonArray.length();
+ for (int i = 0; i < size; ++i) {
+ final JSONObject obj = (JSONObject)jsonArray.get(i);
+ final JMXBean bean = new JMXBean();
+ final Map<String, Object> map = new HashMap<String, Object>();
+ bean.setPropertyMap(map);
+ final JSONArray names = obj.names();
+ int jsonSize = names.length();
+ for (int j = 0 ; j < jsonSize; ++j) {
+ final String key = names.getString(j);
+ Object value = obj.get(key);
+ map.put(key, value);
+ }
+ final String nameString = (String) map.get("name");
+ resultMap.put(nameString, bean);
+ }
+ return resultMap;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
new file mode 100644
index 0000000..48c9133
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import java.io.IOException;
+
+public class ServiceNotResponseException extends IOException {
+
+ private static final long serialVersionUID = -2425311876734366496L;
+
+ /**
+ * Default constructor of FeederException
+ */
+ public ServiceNotResponseException() {
+ super();
+ }
+
+ /**
+ * Constructor of FeederException
+ *
+ * @param message error message
+ */
+ public ServiceNotResponseException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructor of FeederException
+ *
+ * @param message error message
+ * @param cause the cause of the exception
+ *
+ */
+ public ServiceNotResponseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructor of FeederException
+ *
+ * @param cause the cause of the exception
+ */
+ public ServiceNotResponseException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
new file mode 100644
index 0000000..0a8f1d1
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<application>
+ <type>TOPOLOGY_HEALTH_CHECK_APP</type>
+ <name>Topology Health Check</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.topology.TopologyCheckApp</appClass>
+ <viewPath>/apps/jpm</viewPath>
+ <configuration>
+ <!-- org.apache.eagle.topology.TopologyCheckApp -->
+ <property>
+ <name>dataExtractorConfig.site</name>
+ <displayName>site</displayName>
+ <description>Site</description>
+ <value>sandbox</value>
+ </property>
+ <property>
+ <name>dataExtractorConfig.fetchDataIntervalInSecs</name>
+ <displayName>FetchDataIntervalInSecs</displayName>
+ <description>Fetch Data Interval in Secs</description>
+ <value>300</value>
+ </property>
+ <property>
+ <name>dataExtractorConfig.parseThreadPoolSize</name>
+ <displayName>parseThreadPoolSize</displayName>
+ <description>Parser Thread Pool Size</description>
+ <value>5</value>
+ </property>
+ <property>
+ <name>dataExtractorConfig.numDataFetcherSpout</name>
+ <displayName>numDataFetcherSpout</displayName>
+ <description>Spout Task Number</description>
+ <value>1</value>
+ </property>
+ <property>
+ <name>dataExtractorConfig.numEntityPersistBolt</name>
+ <displayName>numEntityPersistBolt</displayName>
+ <description>Bolt Task Number</description>
+ <value>1</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hbase.zkQuorum</name>
+ <displayName>zkQuorum</displayName>
+ <description>Zookeeper Quorum</description>
+ <value>sandbox.hortonworks.com:2181</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hbase.zkZnodeParent</name>
+ <displayName>zkZnodeParent</displayName>
+ <description>Hbase Zookeeper Znode Parent Root</description>
+ <value>/hbase-unsecure</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hbase.zkPropertyClientPort</name>
+ <displayName>zkPropertyClientPort</displayName>
+ <description>Hbase Zookeeper Client Port</description>
+ <value>2181</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hbase.kerberos.master.principal</name>
+ <displayName>hbaseMasterPrincipal</displayName>
+ <description>Hbase Master Principal</description>
+ <value>hadoop/_HOST@EXAMPLE.COM</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hbase.kerberos.eagle.keytab</name>
+ <displayName>eagleKeytab</displayName>
+ <description>Eagle keytab</description>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hbase.kerberos.master.principal</name>
+ <displayName>hbaseMasterPrincipal</displayName>
+ <description>Hbase Master Principal</description>
+ <value>hadoop/_HOST@EXAMPLE.COM</value>
+ </property>
+
+ <property>
+ <name>dataSourceConfig.hdfs.namenodeUrl</name>
+ <displayName>hdfsNamenodeUrl</displayName>
+ <description>Hdfs Namenode Web URL</description>
+ <value>http://sandbox.hortonworks.com:50070</value>
+ </property>
+
+ <property>
+ <name>dataSourceConfig.mr.rmUrl</name>
+ <displayName>resourceManagerUrl</displayName>
+ <description>Resource Manager URL</description>
+ <value>http://sandbox.hortonworks.com:8088</value>
+ </property>
+
+ <property>
+ <name>dataSourceConfig.mr.historyServerUrl</name>
+ <displayName>historyServerUrl</displayName>
+ <description>History Server URL</description>
+ <value></value>
+ </property>
+
+ <property>
+ <name>eagleProps.eagleService.host</name>
+ <description>eagleProps.eagleService.host</description>
+ <value>localhost</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <description>eagleProps.eagleService.port</description>
+ <value>9090</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.username</name>
+ <description>eagleProps.eagleService.username</description>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.password</name>
+ <description>eagleProps.eagleService.password</description>
+ <value>secret</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.basePath</name>
+ <description>eagleProps.eagleService.basePath</description>
+ <value>/rest</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.readTimeOutSeconds</name>
+ <displayName>eagleProps.eagleService.readTimeOutSeconds</displayName>
+ <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description>
+ <value>2</value>
+ </property>
+
+ </configuration>
+ <docs>
+ <install>
+ </install>
+ <uninstall>
+ </uninstall>
+ </docs>
+</application>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..4e08313
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.eagle.topology.TopologyCheckAppProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
new file mode 100644
index 0000000..cbc7ac1
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ appId : "topologyCheckApp",
+ mode : "LOCAL",
+ workers : 1,
+
+ dataExtractorConfig : {
+ "site": "sandbox",
+ "fetchDataIntervalInSecs": 300,
+ "parseThreadPoolSize": 5,
+ "numDataFetcherSpout" : 1,
+ "numEntityPersistBolt" : 1,
+ "rackResolverCls" : "org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver"
+ }
+
+ dataSourceConfig : {
+ hdfs.namenodeUrl: "http://sandbox.hortonworks.com:50070",
+ hbase: {
+ zkQuorum: "sandbox.hortonworks.com",
+ zkPropertyClientPort : "2181",
+ zkZnodeParent: "/hbase-unsecure",
+ zkRetryTimes : "5",
+ kerberos : {
+ master.principal : "hadoop/_HOST@EXAMPLE.COM"
+ eagle.principal: "", #if not need, then empty
+ eagle.keytab: ""
+ }
+ },
+ mr: {
+ rmUrl: "http://sandbox.hortonworks.com:8088",
+ historyServerUrl : "http://sandbox.hortonworks.com:19888" #if not need, then empty
+ }
+ }
+
+ eagleProps : {
+ "mailHost" : "abc.com",
+ "mailDebug" : "true",
+ eagleService.host:"localhost",
+ eagleService.port: 9090,
+ eagleService.username: "admin",
+ eagleService.password : "secret",
+ eagleService.basePath : "/rest",
+ eagleService.readTimeOutSeconds : 20,
+ eagleService.maxFlushNum : 500
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties b/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6b8c8d6
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=../logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
new file mode 100644
index 0000000..6956ef1
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.topology.extractor.hbase.HbaseTopologyCrawler;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestHbaseTopologyCrawler {
+
+ @Test @Ignore
+ public void test() {
+ Config config = ConfigFactory.load();
+
+ TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config);
+ TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
+ HbaseTopologyCrawler crawler = new HbaseTopologyCrawler(topologyCheckAppConfig, rackResolver, null);
+ crawler.extract();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
new file mode 100644
index 0000000..5069a3b
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.topology.extractor.hdfs.HdfsTopologyCrawler;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestHdfsTopologyCrawler {
+
+ @Test @Ignore
+ public void test() {
+ Config config = ConfigFactory.load();
+
+ TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config);
+ TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
+ HdfsTopologyCrawler crawler = new HdfsTopologyCrawler(topologyCheckAppConfig, rackResolver, null);
+ crawler.extract();
+ }
+}