You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/30 01:38:26 UTC
[2/5] incubator-eagle git commit: [EAGLE-806] Integrate Metric
Process and Persistence with Application Framework
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
index f3eac23..94b3727 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
@@ -1,165 +1,165 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.topology.extractor.hbase;
-
-import org.apache.eagle.app.utils.HadoopSecurityUtil;
-import org.apache.eagle.topology.TopologyCheckAppConfig;
-import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
-import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
-import org.apache.eagle.topology.extractor.TopologyEntityParser;
-import org.apache.eagle.topology.resolver.TopologyRackResolver;
-import org.apache.eagle.topology.utils.EntityBuilderHelper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.eagle.topology.TopologyConstants.*;
-
-public class HbaseTopologyEntityParser implements TopologyEntityParser {
-
- private Configuration hBaseConfiguration;
- private String site;
- private Boolean kerberosEnable = false;
- private TopologyRackResolver rackResolver;
-
- public HbaseTopologyEntityParser(String site, TopologyCheckAppConfig.HBaseConfig hBaseConfig, TopologyRackResolver resolver) {
- this.site = site;
- this.rackResolver = resolver;
- this.hBaseConfiguration = HBaseConfiguration.create();
- this.hBaseConfiguration.set("hbase.zookeeper.quorum", hBaseConfig.zkQuorum);
- this.hBaseConfiguration.set("hbase.zookeeper.property.clientPort", hBaseConfig.zkClientPort);
- this.hBaseConfiguration.set("zookeeper.znode.parent", hBaseConfig.zkRoot);
- this.hBaseConfiguration.set("hbase.client.retries.number", hBaseConfig.zkRetryTimes);
- // kerberos authentication
- if (hBaseConfig.eaglePrincipal != null && hBaseConfig.eagleKeytab != null
- && !hBaseConfig.eaglePrincipal.isEmpty() && !hBaseConfig.eagleKeytab.isEmpty()) {
- this.hBaseConfiguration.set(HadoopSecurityUtil.EAGLE_PRINCIPAL_KEY, hBaseConfig.eaglePrincipal);
- this.hBaseConfiguration.set(HadoopSecurityUtil.EAGLE_KEYTAB_FILE_KEY, hBaseConfig.eagleKeytab);
- this.kerberosEnable = true;
- this.hBaseConfiguration.set("hbase.security.authentication", "kerberos");
- this.hBaseConfiguration.set("hbase.master.kerberos.principal", hBaseConfig.hbaseMasterPrincipal);
- }
- }
-
- private HBaseAdmin getHBaseAdmin() throws IOException {
- if (this.kerberosEnable) {
- HadoopSecurityUtil.login(hBaseConfiguration);
- }
- return new HBaseAdmin(this.hBaseConfiguration);
- }
-
- @Override
- public TopologyEntityParserResult parse(long timestamp) throws IOException {
- long deadServers = 0;
- long liveServers = 0;
- TopologyEntityParserResult result = new TopologyEntityParserResult();
- HBaseAdmin admin = null;
- try {
- admin = getHBaseAdmin();
- ClusterStatus status = admin.getClusterStatus();
- deadServers = status.getDeadServers();
- liveServers = status.getServersSize();
- result.setVersion(HadoopVersion.V2);
- for (ServerName liveServer : status.getServers()) {
- ServerLoad load = status.getLoad(liveServer);
- result.getSlaveNodes().add(parseServer(liveServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_LIVE_STATUS, timestamp));
- }
- for (ServerName deadServer : status.getDeadServerNames()) {
- ServerLoad load = status.getLoad(deadServer);
- result.getSlaveNodes().add(parseServer(deadServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_DEAD_STATUS, timestamp));
- }
- ServerName master = status.getMaster();
- if (master != null) {
- ServerLoad load = status.getLoad(master);
- result.getMasterNodes().add(parseServer(master, load, TopologyConstants.HMASTER_ROLE, TopologyConstants.HMASTER_ACTIVE_STATUS, timestamp));
- }
- for (ServerName backupMaster : status.getBackupMasters()) {
- ServerLoad load = status.getLoad(backupMaster);
- result.getMasterNodes().add(parseServer(backupMaster, load, TopologyConstants.HMASTER_ROLE, TopologyConstants.HMASTER_STANDBY_STATUS, timestamp));
- }
- double liveRatio = liveServers * 1d / (liveServers + deadServers);
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.REGIONSERVER_ROLE, liveRatio, site, timestamp));
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, 1d, site, timestamp));
- return result;
- } catch (RuntimeException e) {
- e.printStackTrace();
- } finally {
- if (admin != null) {
- try {
- admin.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return result;
- }
-
- private HBaseServiceTopologyAPIEntity parseServer(ServerName serverName, ServerLoad serverLoad, String role, String status, long timestamp) {
- if (serverName == null) {
- return null;
- }
- HBaseServiceTopologyAPIEntity entity = createEntity(role, serverName.getHostname(), timestamp);
- parseServerLoad(entity, serverLoad);
- entity.setStatus(status);
- return entity;
- }
-
- private void parseServerLoad(HBaseServiceTopologyAPIEntity entity, ServerLoad load) {
- if (load == null) {
- return;
- }
- entity.setMaxHeapMB(load.getMaxHeapMB());
- entity.setUsedHeapMB(load.getUsedHeapMB());
- entity.setNumRegions(load.getNumberOfRegions());
- entity.setNumRequests(load.getNumberOfRequests());
- }
-
- private HBaseServiceTopologyAPIEntity createEntity(String roleType, String hostname, long timestamp) {
- HBaseServiceTopologyAPIEntity entity = new HBaseServiceTopologyAPIEntity();
- Map<String, String> tags = new HashMap<String, String>();
- entity.setTags(tags);
- tags.put(SITE_TAG, site);
- tags.put(ROLE_TAG, roleType);
- tags.put(HOSTNAME_TAG, hostname);
- String rack = rackResolver.resolve(hostname);
- tags.put(RACK_TAG, rack);
- entity.setLastUpdateTime(timestamp);
- entity.setTimestamp(timestamp);
- return entity;
- }
-
- @Override
- public TopologyConstants.TopologyType getTopologyType() {
- return TopologyType.HBASE;
- }
-
- @Override
- public TopologyConstants.HadoopVersion getHadoopVersion() {
- return HadoopVersion.V2;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.hbase;
+
+import org.apache.eagle.app.utils.HadoopSecurityUtil;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.EntityBuilderHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class HbaseTopologyEntityParser implements TopologyEntityParser {
+
+ private Configuration hBaseConfiguration;
+ private String site;
+ private Boolean kerberosEnable = false;
+ private TopologyRackResolver rackResolver;
+
+ public HbaseTopologyEntityParser(String site, TopologyCheckAppConfig.HBaseConfig hBaseConfig, TopologyRackResolver resolver) {
+ this.site = site;
+ this.rackResolver = resolver;
+ this.hBaseConfiguration = HBaseConfiguration.create();
+ this.hBaseConfiguration.set("hbase.zookeeper.quorum", hBaseConfig.zkQuorum);
+ this.hBaseConfiguration.set("hbase.zookeeper.property.clientPort", hBaseConfig.zkClientPort);
+ this.hBaseConfiguration.set("zookeeper.znode.parent", hBaseConfig.zkRoot);
+ this.hBaseConfiguration.set("hbase.client.retries.number", hBaseConfig.zkRetryTimes);
+ // kerberos authentication
+ if (hBaseConfig.eaglePrincipal != null && hBaseConfig.eagleKeytab != null
+ && !hBaseConfig.eaglePrincipal.isEmpty() && !hBaseConfig.eagleKeytab.isEmpty()) {
+ this.hBaseConfiguration.set(HadoopSecurityUtil.EAGLE_PRINCIPAL_KEY, hBaseConfig.eaglePrincipal);
+ this.hBaseConfiguration.set(HadoopSecurityUtil.EAGLE_KEYTAB_FILE_KEY, hBaseConfig.eagleKeytab);
+ this.kerberosEnable = true;
+ this.hBaseConfiguration.set("hbase.security.authentication", "kerberos");
+ this.hBaseConfiguration.set("hbase.master.kerberos.principal", hBaseConfig.hbaseMasterPrincipal);
+ }
+ }
+
+ private HBaseAdmin getHBaseAdmin() throws IOException {
+ if (this.kerberosEnable) {
+ HadoopSecurityUtil.login(hBaseConfiguration);
+ }
+ return new HBaseAdmin(this.hBaseConfiguration);
+ }
+
+ @Override
+ public TopologyEntityParserResult parse(long timestamp) throws IOException {
+ long deadServers = 0;
+ long liveServers = 0;
+ TopologyEntityParserResult result = new TopologyEntityParserResult();
+ HBaseAdmin admin = null;
+ try {
+ admin = getHBaseAdmin();
+ ClusterStatus status = admin.getClusterStatus();
+ deadServers = status.getDeadServers();
+ liveServers = status.getServersSize();
+ result.setVersion(HadoopVersion.V2);
+ for (ServerName liveServer : status.getServers()) {
+ ServerLoad load = status.getLoad(liveServer);
+ result.getSlaveNodes().add(parseServer(liveServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_LIVE_STATUS, timestamp));
+ }
+ for (ServerName deadServer : status.getDeadServerNames()) {
+ ServerLoad load = status.getLoad(deadServer);
+ result.getSlaveNodes().add(parseServer(deadServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_DEAD_STATUS, timestamp));
+ }
+ ServerName master = status.getMaster();
+ if (master != null) {
+ ServerLoad load = status.getLoad(master);
+ result.getMasterNodes().add(parseServer(master, load, TopologyConstants.HMASTER_ROLE, TopologyConstants.HMASTER_ACTIVE_STATUS, timestamp));
+ }
+ for (ServerName backupMaster : status.getBackupMasters()) {
+ ServerLoad load = status.getLoad(backupMaster);
+ result.getMasterNodes().add(parseServer(backupMaster, load, TopologyConstants.HMASTER_ROLE, TopologyConstants.HMASTER_STANDBY_STATUS, timestamp));
+ }
+ double liveRatio = liveServers * 1d / (liveServers + deadServers);
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.REGIONSERVER_ROLE, liveRatio, site, timestamp));
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, 1d, site, timestamp));
+ return result;
+ } catch (RuntimeException e) {
+ e.printStackTrace();
+ } finally {
+ if (admin != null) {
+ try {
+ admin.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return result;
+ }
+
+ private HBaseServiceTopologyAPIEntity parseServer(ServerName serverName, ServerLoad serverLoad, String role, String status, long timestamp) {
+ if (serverName == null) {
+ return null;
+ }
+ HBaseServiceTopologyAPIEntity entity = createEntity(role, serverName.getHostname(), timestamp);
+ parseServerLoad(entity, serverLoad);
+ entity.setStatus(status);
+ return entity;
+ }
+
+ private void parseServerLoad(HBaseServiceTopologyAPIEntity entity, ServerLoad load) {
+ if (load == null) {
+ return;
+ }
+ entity.setMaxHeapMB(load.getMaxHeapMB());
+ entity.setUsedHeapMB(load.getUsedHeapMB());
+ entity.setNumRegions(load.getNumberOfRegions());
+ entity.setNumRequests(load.getNumberOfRequests());
+ }
+
+ private HBaseServiceTopologyAPIEntity createEntity(String roleType, String hostname, long timestamp) {
+ HBaseServiceTopologyAPIEntity entity = new HBaseServiceTopologyAPIEntity();
+ Map<String, String> tags = new HashMap<String, String>();
+ entity.setTags(tags);
+ tags.put(SITE_TAG, site);
+ tags.put(ROLE_TAG, roleType);
+ tags.put(HOSTNAME_TAG, hostname);
+ String rack = rackResolver.resolve(hostname);
+ tags.put(RACK_TAG, rack);
+ entity.setLastUpdateTime(timestamp);
+ entity.setTimestamp(timestamp);
+ return entity;
+ }
+
+ @Override
+ public TopologyConstants.TopologyType getTopologyType() {
+ return TopologyType.HBASE;
+ }
+
+ @Override
+ public TopologyConstants.HadoopVersion getHadoopVersion() {
+ return HadoopVersion.V2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
index 3cb55e9..79277a4 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
@@ -1,290 +1,289 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.topology.extractor.hdfs;
-
-import org.apache.eagle.app.utils.PathResolverHelper;
-import org.apache.eagle.topology.TopologyCheckAppConfig;
-import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
-import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
-import org.apache.eagle.topology.extractor.TopologyEntityParser;
-import org.apache.eagle.topology.resolver.TopologyRackResolver;
-import org.apache.eagle.topology.utils.*;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.eagle.topology.TopologyConstants.*;
-import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
-
-public class HdfsTopologyEntityParser implements TopologyEntityParser {
-
- private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class);
- private String[] namenodeUrls;
- private String site;
- private TopologyRackResolver rackResolver;
-
- private static final String JMX_URL = "/jmx?anonymous=true";
- private static final String JMX_FS_NAME_SYSTEM_BEAN_NAME = "Hadoop:service=NameNode,name=FSNamesystem";
- private static final String JMX_NAMENODE_INFO = "Hadoop:service=NameNode,name=NameNodeInfo";
-
- private static final String HA_STATE = "tag.HAState";
- private static final String HA_NAME = "tag.Hostname";
- private static final String CAPACITY_TOTAL_GB = "CapacityTotalGB";
- private static final String CAPACITY_USED_GB = "CapacityUsedGB";
- private static final String BLOCKS_TOTAL = "BlocksTotal";
- private static final String LIVE_NODES = "LiveNodes";
- private static final String DEAD_NODES = "DeadNodes";
-
- private static final String JN_STATUS = "NameJournalStatus";
- private static final String JN_TRANSACTION_INFO = "JournalTransactionInfo";
- private static final String LAST_TX_ID = "LastAppliedOrWrittenTxId";
-
- private static final String DATA_NODE_NUM_BLOCKS = "numBlocks";
- private static final String DATA_NODE_USED_SPACE = "usedSpace";
- private static final String DATA_NODE_CAPACITY = "capacity";
- private static final String DATA_NODE_ADMIN_STATE = "adminState";
- private static final String DATA_NODE_FAILED_VOLUMN = "volfails";
- private static final String DATA_NODE_VERSION = "version";
- private static final String NAME_NODE_VERSION = "Version";
-
- private static final String DATA_NODE_DECOMMISSIONED = "Decommissioned";
- private static final String DATA_NODE_DECOMMISSIONED_STATE = "decommissioned";
-
- private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)";
- private static final String QJM_PATTERN = "([\\d\\.]+):\\d+";
-
- private static final double TB = 1024 * 1024 * 1024 * 1024;
-
- public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) {
- this.namenodeUrls = hdfsConfig.namenodeUrls;
- this.site = site;
- this.rackResolver = rackResolver;
- }
-
- @Override
- public TopologyEntityParserResult parse(long timestamp) throws IOException {
- final TopologyEntityParserResult result = new TopologyEntityParserResult();
- result.setVersion(TopologyConstants.HadoopVersion.V2);
- int numNamenode = 0;
- for (String url : namenodeUrls) {
- try {
- final HdfsServiceTopologyAPIEntity namenodeEntity = createNamenodeEntity(url, timestamp);
- result.getMasterNodes().add(namenodeEntity);
- numNamenode++;
- if (namenodeEntity.getStatus().equalsIgnoreCase(NAME_NODE_ACTIVE_STATUS)) {
- String namenodeVersion = createSlaveNodeEntities(url, timestamp, result);
- namenodeEntity.setVersion(namenodeVersion);
- }
- } catch (RuntimeException ex) {
- ex.printStackTrace();
- } catch (IOException e) {
- LOG.warn("Catch an IOException with url: {}", url);
- }
- }
- double value = numNamenode * 1d / namenodeUrls.length;
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NAME_NODE_ROLE, value, site, timestamp));
- return result;
- }
-
- private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws JSONException, IOException {
- final String urlString = buildFSNamesystemURL(url);
- final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
- final JMXBean bean = jmxBeanMap.get(JMX_FS_NAME_SYSTEM_BEAN_NAME);
-
- if (bean == null || bean.getPropertyMap() == null) {
- throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!");
- }
-
- final String hostname = (String) bean.getPropertyMap().get(HA_NAME);
- HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
- final String state = (String) bean.getPropertyMap().get(HA_STATE);
- result.setStatus(state);
- final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
- result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024));
- final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB);
- result.setUsedCapacityTB(Double.toString(capacityUsedGB / 1024));
- final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL);
- result.setNumBlocks(Integer.toString(blocksTotal));
- return result;
- }
-
- private String createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws IOException {
- final String urlString = buildNamenodeInfo(url);
- final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
- final JMXBean bean = jmxBeanMap.get(JMX_NAMENODE_INFO);
- if (bean == null || bean.getPropertyMap() == null) {
- throw new ServiceNotResponseException("Invalid JMX format, NameNodeInfo bean is null!");
- }
- createAllDataNodeEntities(bean, updateTime, result);
- createAllJournalNodeEntities(bean, updateTime, result);
- return (String) bean.getPropertyMap().get(NAME_NODE_VERSION);
- }
-
- private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws UnknownHostException {
- if (bean.getPropertyMap().get(JN_TRANSACTION_INFO) == null || bean.getPropertyMap().get(JN_STATUS) == null) {
- return;
- }
- String jnInfoString = (String) bean.getPropertyMap().get(JN_TRANSACTION_INFO);
- JSONObject jsonObject = new JSONObject(jnInfoString);
- long lastTxId = Long.parseLong(jsonObject.getString(LAST_TX_ID));
-
- String journalnodeString = (String) bean.getPropertyMap().get(JN_STATUS);
- JSONArray jsonArray = new JSONArray(journalnodeString);
- JSONObject jsonMap = (JSONObject) jsonArray.get(0);
-
- Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>();
- String manager = jsonMap.getString("manager");
- Pattern qjm = Pattern.compile(QJM_PATTERN);
- Matcher jpmMatcher = qjm.matcher(manager);
- while (jpmMatcher.find()) {
- String ip = jpmMatcher.group(1);
- String hostname = EntityBuilderHelper.resolveHostByIp(ip);
- HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.JOURNAL_NODE_ROLE, hostname, updateTime);
- entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
- journalNodesMap.put(ip, entity);
- }
- if (journalNodesMap.isEmpty()) {
- LOG.warn("Fail to find journal node info in JMX");
- return;
- }
-
- String stream = jsonMap.getString("stream");
- Pattern status = Pattern.compile(STATUS_PATTERN);
- Matcher statusMatcher = status.matcher(stream);
- long numLiveJournalNodes = 0;
- while (statusMatcher.find()) {
- numLiveJournalNodes++;
- String ip = statusMatcher.group(1);
- if (journalNodesMap.containsKey(ip)) {
- long txid = Long.parseLong(statusMatcher.group(2));
- journalNodesMap.get(ip).setWrittenTxidDiff(lastTxId - txid);
- journalNodesMap.get(ip).setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
- }
- }
- result.getMasterNodes().addAll(journalNodesMap.values());
-
- double value = numLiveJournalNodes * 1d / journalNodesMap.size();
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.JOURNAL_NODE_ROLE, value, site, updateTime));
- }
-
- private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException, IOException {
- int numLiveNodes = 0;
- int numLiveDecommNodes = 0;
- int numDeadNodes = 0;
- int numDeadDecommNodes = 0;
-
- String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES);
- JSONTokener tokener = new JSONTokener(deadNodesStrings);
- JSONObject jsonNodesObject = new JSONObject(tokener);
- final JSONArray deadNodes = jsonNodesObject.names();
- for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) {
- final String hostname = deadNodes.getString(i);
- final JSONObject deadNode = jsonNodesObject.getJSONObject(hostname);
- HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime);
- if (deadNode.getBoolean(DATA_NODE_DECOMMISSIONED_STATE)) {
- ++numDeadDecommNodes;
- entity.setStatus(TopologyConstants.DATA_NODE_DEAD_DECOMMISSIONED_STATUS);
- } else {
- entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
- }
- ++numDeadNodes;
- result.getSlaveNodes().add(entity);
- }
- LOG.info("Dead nodes " + numDeadNodes + ", dead but decommissioned nodes: " + numDeadDecommNodes);
-
- String liveNodesStrings = (String) bean.getPropertyMap().get(LIVE_NODES);
- tokener = new JSONTokener(liveNodesStrings);
- jsonNodesObject = new JSONObject(tokener);
- final JSONArray liveNodes = jsonNodesObject.names();
- for (int i = 0; liveNodes != null && i < liveNodes.length(); ++i) {
- final String hostname = liveNodes.getString(i);
- final JSONObject liveNode = jsonNodesObject.getJSONObject(hostname);
-
- HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime);
- final Number configuredCapacity = (Number) liveNode.get(DATA_NODE_CAPACITY);
- entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / TB));
- final Number capacityUsed = (Number) liveNode.get(DATA_NODE_USED_SPACE);
- entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / TB));
- final Number blocksTotal = (Number) liveNode.get(DATA_NODE_NUM_BLOCKS);
- entity.setNumBlocks(Double.toString(blocksTotal.doubleValue()));
- if (liveNode.has(DATA_NODE_FAILED_VOLUMN)) {
- final Number volFails = (Number) liveNode.get(DATA_NODE_FAILED_VOLUMN);
- entity.setNumFailedVolumes(Double.toString(volFails.doubleValue()));
- }
- final String adminState = liveNode.getString(DATA_NODE_ADMIN_STATE);
- if (DATA_NODE_DECOMMISSIONED.equalsIgnoreCase(adminState)) {
- ++numLiveDecommNodes;
- entity.setStatus(TopologyConstants.DATA_NODE_LIVE_DECOMMISSIONED_STATUS);
- } else {
- entity.setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
- }
- entity.setVersion(String.valueOf(liveNode.get(DATA_NODE_VERSION)));
- numLiveNodes++;
- result.getSlaveNodes().add(entity);
- }
- LOG.info("Live nodes " + numLiveNodes + ", live but decommissioned nodes: " + numLiveDecommNodes);
-
- double value = numLiveNodes * 1.0d / result.getSlaveNodes().size();
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.DATA_NODE_ROLE, value, site, updateTime));
- }
-
- private HdfsServiceTopologyAPIEntity createHdfsServiceEntity(String roleType, String hostname, long updateTime) {
- HdfsServiceTopologyAPIEntity entity = new HdfsServiceTopologyAPIEntity();
- entity.setTimestamp(updateTime);
- entity.setLastUpdateTime(updateTime);
- Map<String, String> tags = new HashMap<String, String>();
- entity.setTags(tags);
- tags.put(SITE_TAG, site);
- tags.put(ROLE_TAG, roleType);
- tags.put(HOSTNAME_TAG, hostname);
- String rack = rackResolver.resolve(hostname);
- tags.put(RACK_TAG, rack);
- return entity;
- }
-
- private String buildFSNamesystemURL(String url) {
- return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_FS_NAME_SYSTEM_BEAN_NAME);
- }
-
- private String buildNamenodeInfo(String url) {
- return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_NAMENODE_INFO);
- }
-
- @Override
- public TopologyConstants.TopologyType getTopologyType() {
- return TopologyConstants.TopologyType.HDFS;
- }
-
- @Override
- public TopologyConstants.HadoopVersion getHadoopVersion() {
- return TopologyConstants.HadoopVersion.V2;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.hdfs;
+
+import org.apache.eagle.app.utils.PathResolverHelper;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.*;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
+
+public class HdfsTopologyEntityParser implements TopologyEntityParser {
+
+ private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class);
+ private String[] namenodeUrls;
+ private String site;
+ private TopologyRackResolver rackResolver;
+
+ private static final String JMX_URL = "/jmx?anonymous=true";
+ private static final String JMX_FS_NAME_SYSTEM_BEAN_NAME = "Hadoop:service=NameNode,name=FSNamesystem";
+ private static final String JMX_NAMENODE_INFO = "Hadoop:service=NameNode,name=NameNodeInfo";
+
+ private static final String HA_STATE = "tag.HAState";
+ private static final String HA_NAME = "tag.Hostname";
+ private static final String CAPACITY_TOTAL_GB = "CapacityTotalGB";
+ private static final String CAPACITY_USED_GB = "CapacityUsedGB";
+ private static final String BLOCKS_TOTAL = "BlocksTotal";
+ private static final String LIVE_NODES = "LiveNodes";
+ private static final String DEAD_NODES = "DeadNodes";
+
+ private static final String JN_STATUS = "NameJournalStatus";
+ private static final String JN_TRANSACTION_INFO = "JournalTransactionInfo";
+ private static final String LAST_TX_ID = "LastAppliedOrWrittenTxId";
+
+ private static final String DATA_NODE_NUM_BLOCKS = "numBlocks";
+ private static final String DATA_NODE_USED_SPACE = "usedSpace";
+ private static final String DATA_NODE_CAPACITY = "capacity";
+ private static final String DATA_NODE_ADMIN_STATE = "adminState";
+ private static final String DATA_NODE_FAILED_VOLUMN = "volfails";
+ private static final String DATA_NODE_VERSION = "version";
+ private static final String NAME_NODE_VERSION = "Version";
+
+ private static final String DATA_NODE_DECOMMISSIONED = "Decommissioned";
+ private static final String DATA_NODE_DECOMMISSIONED_STATE = "decommissioned";
+
+ private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)";
+ private static final String QJM_PATTERN = "([\\d\\.]+):\\d+";
+
+ private static final double TB = 1024 * 1024 * 1024 * 1024;
+
+ public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) {
+ this.namenodeUrls = hdfsConfig.namenodeUrls;
+ this.site = site;
+ this.rackResolver = rackResolver;
+ }
+
+ @Override
+ public TopologyEntityParserResult parse(long timestamp) throws IOException {
+ final TopologyEntityParserResult result = new TopologyEntityParserResult();
+ result.setVersion(TopologyConstants.HadoopVersion.V2);
+ int numNamenode = 0;
+ for (String url : namenodeUrls) {
+ try {
+ final HdfsServiceTopologyAPIEntity namenodeEntity = createNamenodeEntity(url, timestamp);
+ result.getMasterNodes().add(namenodeEntity);
+ numNamenode++;
+ if (namenodeEntity.getStatus().equalsIgnoreCase(NAME_NODE_ACTIVE_STATUS)) {
+ String namenodeVersion = createSlaveNodeEntities(url, timestamp, result);
+ namenodeEntity.setVersion(namenodeVersion);
+ }
+ } catch (RuntimeException ex) {
+ ex.printStackTrace();
+ } catch (IOException e) {
+ LOG.warn("Catch an IOException with url: {}", url);
+ }
+ }
+ double value = numNamenode * 1d / namenodeUrls.length;
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NAME_NODE_ROLE, value, site, timestamp));
+ return result;
+ }
+
+ private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws JSONException, IOException {
+ final String urlString = buildFSNamesystemURL(url);
+ final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
+ final JMXBean bean = jmxBeanMap.get(JMX_FS_NAME_SYSTEM_BEAN_NAME);
+
+ if (bean == null || bean.getPropertyMap() == null) {
+ throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!");
+ }
+
+ final String hostname = (String) bean.getPropertyMap().get(HA_NAME);
+ HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
+ final String state = (String) bean.getPropertyMap().get(HA_STATE);
+ result.setStatus(state);
+ final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
+ result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024));
+ final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB);
+ result.setUsedCapacityTB(Double.toString(capacityUsedGB / 1024));
+ final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL);
+ result.setNumBlocks(Integer.toString(blocksTotal));
+ return result;
+ }
+
+ private String createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws IOException {
+ final String urlString = buildNamenodeInfo(url);
+ final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
+ final JMXBean bean = jmxBeanMap.get(JMX_NAMENODE_INFO);
+ if (bean == null || bean.getPropertyMap() == null) {
+ throw new ServiceNotResponseException("Invalid JMX format, NameNodeInfo bean is null!");
+ }
+ createAllDataNodeEntities(bean, updateTime, result);
+ createAllJournalNodeEntities(bean, updateTime, result);
+ return (String) bean.getPropertyMap().get(NAME_NODE_VERSION);
+ }
+
+ private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws UnknownHostException {
+ if (bean.getPropertyMap().get(JN_TRANSACTION_INFO) == null || bean.getPropertyMap().get(JN_STATUS) == null) {
+ return;
+ }
+ String jnInfoString = (String) bean.getPropertyMap().get(JN_TRANSACTION_INFO);
+ JSONObject jsonObject = new JSONObject(jnInfoString);
+ long lastTxId = Long.parseLong(jsonObject.getString(LAST_TX_ID));
+
+ String journalnodeString = (String) bean.getPropertyMap().get(JN_STATUS);
+ JSONArray jsonArray = new JSONArray(journalnodeString);
+ JSONObject jsonMap = (JSONObject) jsonArray.get(0);
+
+ Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>();
+ String manager = jsonMap.getString("manager");
+ Pattern qjm = Pattern.compile(QJM_PATTERN);
+ Matcher jpmMatcher = qjm.matcher(manager);
+ while (jpmMatcher.find()) {
+ String ip = jpmMatcher.group(1);
+ String hostname = EntityBuilderHelper.resolveHostByIp(ip);
+ HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.JOURNAL_NODE_ROLE, hostname, updateTime);
+ entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
+ journalNodesMap.put(ip, entity);
+ }
+ if (journalNodesMap.isEmpty()) {
+ LOG.warn("Fail to find journal node info in JMX");
+ return;
+ }
+
+ String stream = jsonMap.getString("stream");
+ Pattern status = Pattern.compile(STATUS_PATTERN);
+ Matcher statusMatcher = status.matcher(stream);
+ long numLiveJournalNodes = 0;
+ while (statusMatcher.find()) {
+ numLiveJournalNodes++;
+ String ip = statusMatcher.group(1);
+ if (journalNodesMap.containsKey(ip)) {
+ long txid = Long.parseLong(statusMatcher.group(2));
+ journalNodesMap.get(ip).setWrittenTxidDiff(lastTxId - txid);
+ journalNodesMap.get(ip).setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
+ }
+ }
+ result.getMasterNodes().addAll(journalNodesMap.values());
+
+ double value = numLiveJournalNodes * 1d / journalNodesMap.size();
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.JOURNAL_NODE_ROLE, value, site, updateTime));
+ }
+
+ private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException, IOException {
+ int numLiveNodes = 0;
+ int numLiveDecommNodes = 0;
+ int numDeadNodes = 0;
+ int numDeadDecommNodes = 0;
+
+ String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES);
+ JSONTokener tokener = new JSONTokener(deadNodesStrings);
+ JSONObject jsonNodesObject = new JSONObject(tokener);
+ final JSONArray deadNodes = jsonNodesObject.names();
+ for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) {
+ final String hostname = deadNodes.getString(i);
+ final JSONObject deadNode = jsonNodesObject.getJSONObject(hostname);
+ HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime);
+ if (deadNode.getBoolean(DATA_NODE_DECOMMISSIONED_STATE)) {
+ ++numDeadDecommNodes;
+ entity.setStatus(TopologyConstants.DATA_NODE_DEAD_DECOMMISSIONED_STATUS);
+ } else {
+ entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
+ }
+ ++numDeadNodes;
+ result.getSlaveNodes().add(entity);
+ }
+ LOG.info("Dead nodes " + numDeadNodes + ", dead but decommissioned nodes: " + numDeadDecommNodes);
+
+ String liveNodesStrings = (String) bean.getPropertyMap().get(LIVE_NODES);
+ tokener = new JSONTokener(liveNodesStrings);
+ jsonNodesObject = new JSONObject(tokener);
+ final JSONArray liveNodes = jsonNodesObject.names();
+ for (int i = 0; liveNodes != null && i < liveNodes.length(); ++i) {
+ final String hostname = liveNodes.getString(i);
+ final JSONObject liveNode = jsonNodesObject.getJSONObject(hostname);
+
+ HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime);
+ final Number configuredCapacity = (Number) liveNode.get(DATA_NODE_CAPACITY);
+ entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / TB));
+ final Number capacityUsed = (Number) liveNode.get(DATA_NODE_USED_SPACE);
+ entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / TB));
+ final Number blocksTotal = (Number) liveNode.get(DATA_NODE_NUM_BLOCKS);
+ entity.setNumBlocks(Double.toString(blocksTotal.doubleValue()));
+ if (liveNode.has(DATA_NODE_FAILED_VOLUMN)) {
+ final Number volFails = (Number) liveNode.get(DATA_NODE_FAILED_VOLUMN);
+ entity.setNumFailedVolumes(Double.toString(volFails.doubleValue()));
+ }
+ final String adminState = liveNode.getString(DATA_NODE_ADMIN_STATE);
+ if (DATA_NODE_DECOMMISSIONED.equalsIgnoreCase(adminState)) {
+ ++numLiveDecommNodes;
+ entity.setStatus(TopologyConstants.DATA_NODE_LIVE_DECOMMISSIONED_STATUS);
+ } else {
+ entity.setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
+ }
+ entity.setVersion(String.valueOf(liveNode.get(DATA_NODE_VERSION)));
+ numLiveNodes++;
+ result.getSlaveNodes().add(entity);
+ }
+ LOG.info("Live nodes " + numLiveNodes + ", live but decommissioned nodes: " + numLiveDecommNodes);
+
+ double value = numLiveNodes * 1.0d / result.getSlaveNodes().size();
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.DATA_NODE_ROLE, value, site, updateTime));
+ }
+
+ private HdfsServiceTopologyAPIEntity createHdfsServiceEntity(String roleType, String hostname, long updateTime) {
+ HdfsServiceTopologyAPIEntity entity = new HdfsServiceTopologyAPIEntity();
+ entity.setTimestamp(updateTime);
+ entity.setLastUpdateTime(updateTime);
+ Map<String, String> tags = new HashMap<String, String>();
+ entity.setTags(tags);
+ tags.put(SITE_TAG, site);
+ tags.put(ROLE_TAG, roleType);
+ tags.put(HOSTNAME_TAG, hostname);
+ String rack = rackResolver.resolve(hostname);
+ tags.put(RACK_TAG, rack);
+ return entity;
+ }
+
+ private String buildFSNamesystemURL(String url) {
+ return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_FS_NAME_SYSTEM_BEAN_NAME);
+ }
+
+ private String buildNamenodeInfo(String url) {
+ return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_NAMENODE_INFO);
+ }
+
+ @Override
+ public TopologyConstants.TopologyType getTopologyType() {
+ return TopologyConstants.TopologyType.HDFS;
+ }
+
+ @Override
+ public TopologyConstants.HadoopVersion getHadoopVersion() {
+ return TopologyConstants.HadoopVersion.V2;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
index 52148b7..447686c 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
@@ -1,219 +1,219 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.topology.extractor.mr;
-
-import org.apache.eagle.app.utils.AppConstants;
-import org.apache.eagle.app.utils.PathResolverHelper;
-import org.apache.eagle.app.utils.connection.InputStreamUtils;
-import org.apache.eagle.topology.TopologyCheckAppConfig;
-import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
-import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
-import org.apache.eagle.topology.extractor.TopologyEntityParser;
-import org.apache.eagle.topology.resolver.TopologyRackResolver;
-import org.apache.eagle.topology.utils.EntityBuilderHelper;
-import org.apache.eagle.topology.utils.ServiceNotResponseException;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ConnectException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-
-import static org.apache.eagle.topology.TopologyConstants.*;
-
-public class MRTopologyEntityParser implements TopologyEntityParser {
-
- private String[] rmUrls;
- private String historyServerUrl;
- private String site;
- private TopologyRackResolver rackResolver;
-
- private static final String YARN_NODES_URL = "/ws/v1/cluster/nodes?anonymous=true";
- private static final String YARN_HISTORY_SERVER_URL = "/ws/v1/history/info";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MRTopologyEntityParser.class);
- private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
-
- static {
- OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
- }
-
- public MRTopologyEntityParser(String site, TopologyCheckAppConfig.MRConfig config, TopologyRackResolver rackResolver) {
- this.site = site;
- this.rmUrls = config.rmUrls;
- this.historyServerUrl = config.historyServerUrl;
- this.rackResolver = rackResolver;
- }
-
- @Override
- public TopologyConstants.HadoopVersion getHadoopVersion() {
- return TopologyConstants.HadoopVersion.V2;
- }
-
- @Override
- public TopologyConstants.TopologyType getTopologyType() {
- return TopologyConstants.TopologyType.MR;
- }
-
- @Override
- public TopologyEntityParserResult parse(long timestamp) {
- final TopologyEntityParserResult result = new TopologyEntityParserResult();
- result.setVersion(TopologyConstants.HadoopVersion.V2);
-
- for (String url : rmUrls) {
- try {
- doParse(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL), timestamp, result);
- } catch (ServiceNotResponseException ex) {
- LOGGER.warn("Catch a ServiceNotResponseException with url: {}", url);
- // reSelect url
- }
- }
-
- double value = result.getMasterNodes().isEmpty() ? 0 : result.getMasterNodes().size() * 1d / rmUrls.length;
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.RESOURCE_MANAGER_ROLE, value, site, timestamp));
-
- doCheckHistoryServer(timestamp, result);
- return result;
- }
-
- private void doCheckHistoryServer(long updateTime, TopologyEntityParserResult result) {
- if (historyServerUrl == null || historyServerUrl.isEmpty()) {
- return;
- }
- String hsUrl = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL);
- double liveCount = 1;
- try {
- InputStreamUtils.getInputStream(hsUrl, null, AppConstants.CompressionType.NONE);
- } catch (ConnectException e) {
- liveCount = 0;
- } catch (Exception e) {
- e.printStackTrace();
- return;
- }
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, liveCount, site, updateTime));
- }
-
- private InputStream getInputStream(String url, AppConstants.CompressionType type) throws ServiceNotResponseException {
- InputStream is = null;
- try {
- is = InputStreamUtils.getInputStream(url, null, type);
- } catch (ConnectException e) {
- throw new ServiceNotResponseException(e);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return is;
- }
-
- private void doParse(String url, long timestamp, TopologyEntityParserResult result) throws ServiceNotResponseException {
-
- InputStream is = null;
- try {
- LOGGER.info("Going to query URL: " + url);
- is = InputStreamUtils.getInputStream(url, null, AppConstants.CompressionType.NONE);
- YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class);
- if (nodeWrapper.getNodes() == null || nodeWrapper.getNodes().getNode() == null) {
- throw new ServiceNotResponseException("Invalid result of URL: " + url);
- }
- int runningNodeCount = 0;
- int lostNodeCount = 0;
- int unhealthyNodeCount = 0;
- final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode();
- for (YarnNodeInfo info : list) {
- final MRServiceTopologyAPIEntity nodeManagerEntity = createEntity(NODE_MANAGER_ROLE, info.getNodeHostName(), timestamp);
- if (info.getHealthReport() != null && (!info.getHealthReport().isEmpty())) {
- nodeManagerEntity.setHealthReport(info.getHealthReport());
- }
- // TODO: Need to remove the manually mapping RUNNING -> running, LOST - > lost, UNHEALTHY -> unhealthy
- if (info.getState() != null) {
- final String state = info.getState().toLowerCase();
- nodeManagerEntity.setStatus(state);
- if (state.equals(TopologyConstants.NODE_MANAGER_RUNNING_STATUS)) {
- ++runningNodeCount;
- } else if (state.equals(TopologyConstants.NODE_MANAGER_LOST_STATUS)) {
- ++lostNodeCount;
- } else if (state.equals(TopologyConstants.NODE_MANAGER_UNHEALTHY_STATUS)) {
- ++unhealthyNodeCount;
- }
- }
- result.getSlaveNodes().add(nodeManagerEntity);
- }
- LOGGER.info("Running NMs: " + runningNodeCount + ", lost NMs: " + lostNodeCount + ", unhealthy NMs: " + unhealthyNodeCount);
- final MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, extractMasterHost(url), timestamp);
- resourceManagerEntity.setStatus(TopologyConstants.RESOURCE_MANAGER_ACTIVE_STATUS);
- result.getMasterNodes().add(resourceManagerEntity);
- double value = runningNodeCount * 1d / list.size();
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp));
- } catch (RuntimeException e) {
- e.printStackTrace();
- } catch (IOException e) {
- throw new ServiceNotResponseException(e);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- e.printStackTrace();
- // Do nothing
- }
- }
- }
- }
-
- private String extractMasterHost(String url) {
- Matcher matcher = TopologyConstants.HTTP_HOST_MATCH_PATTERN.matcher(url);
- if (matcher.find()) {
- return matcher.group(1);
- }
- return url;
- }
-
- private String extractRack(YarnNodeInfo info) {
- if (info.getRack() == null) {
- return null;
- }
- String value = info.getRack();
- value = value.substring(value.lastIndexOf('/') + 1);
- return value;
- }
-
- private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) {
- MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity();
- entity.setTimestamp(updateTime);
- entity.setLastUpdateTime(updateTime);
- Map<String, String> tags = new HashMap<String, String>();
- entity.setTags(tags);
- tags.put(SITE_TAG, site);
- tags.put(ROLE_TAG, roleType);
- tags.put(HOSTNAME_TAG, hostname);
- String rack = rackResolver.resolve(hostname);
- tags.put(RACK_TAG, rack);
- return entity;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.apache.eagle.app.utils.AppConstants;
+import org.apache.eagle.app.utils.PathResolverHelper;
+import org.apache.eagle.app.utils.connection.InputStreamUtils;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.EntityBuilderHelper;
+import org.apache.eagle.topology.utils.ServiceNotResponseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class MRTopologyEntityParser implements TopologyEntityParser {
+
+ private String[] rmUrls;
+ private String historyServerUrl;
+ private String site;
+ private TopologyRackResolver rackResolver;
+
+ private static final String YARN_NODES_URL = "/ws/v1/cluster/nodes?anonymous=true";
+ private static final String YARN_HISTORY_SERVER_URL = "/ws/v1/history/info";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MRTopologyEntityParser.class);
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public MRTopologyEntityParser(String site, TopologyCheckAppConfig.MRConfig config, TopologyRackResolver rackResolver) {
+ this.site = site;
+ this.rmUrls = config.rmUrls;
+ this.historyServerUrl = config.historyServerUrl;
+ this.rackResolver = rackResolver;
+ }
+
+ @Override
+ public TopologyConstants.HadoopVersion getHadoopVersion() {
+ return TopologyConstants.HadoopVersion.V2;
+ }
+
+ @Override
+ public TopologyConstants.TopologyType getTopologyType() {
+ return TopologyConstants.TopologyType.MR;
+ }
+
+ @Override
+ public TopologyEntityParserResult parse(long timestamp) {
+ final TopologyEntityParserResult result = new TopologyEntityParserResult();
+ result.setVersion(TopologyConstants.HadoopVersion.V2);
+
+ for (String url : rmUrls) {
+ try {
+ doParse(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL), timestamp, result);
+ } catch (ServiceNotResponseException ex) {
+ LOGGER.warn("Catch a ServiceNotResponseException with url: {}", url);
+ // reSelect url
+ }
+ }
+
+ double value = result.getMasterNodes().isEmpty() ? 0 : result.getMasterNodes().size() * 1d / rmUrls.length;
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.RESOURCE_MANAGER_ROLE, value, site, timestamp));
+
+ doCheckHistoryServer(timestamp, result);
+ return result;
+ }
+
+ private void doCheckHistoryServer(long updateTime, TopologyEntityParserResult result) {
+ if (historyServerUrl == null || historyServerUrl.isEmpty()) {
+ return;
+ }
+ String hsUrl = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL);
+ double liveCount = 1;
+ try {
+ InputStreamUtils.getInputStream(hsUrl, null, AppConstants.CompressionType.NONE);
+ } catch (ConnectException e) {
+ liveCount = 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, liveCount, site, updateTime));
+ }
+
+ private InputStream getInputStream(String url, AppConstants.CompressionType type) throws ServiceNotResponseException {
+ InputStream is = null;
+ try {
+ is = InputStreamUtils.getInputStream(url, null, type);
+ } catch (ConnectException e) {
+ throw new ServiceNotResponseException(e);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return is;
+ }
+
+ private void doParse(String url, long timestamp, TopologyEntityParserResult result) throws ServiceNotResponseException {
+
+ InputStream is = null;
+ try {
+ LOGGER.info("Going to query URL: " + url);
+ is = InputStreamUtils.getInputStream(url, null, AppConstants.CompressionType.NONE);
+ YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class);
+ if (nodeWrapper.getNodes() == null || nodeWrapper.getNodes().getNode() == null) {
+ throw new ServiceNotResponseException("Invalid result of URL: " + url);
+ }
+ int runningNodeCount = 0;
+ int lostNodeCount = 0;
+ int unhealthyNodeCount = 0;
+ final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode();
+ for (YarnNodeInfo info : list) {
+ final MRServiceTopologyAPIEntity nodeManagerEntity = createEntity(NODE_MANAGER_ROLE, info.getNodeHostName(), timestamp);
+ if (info.getHealthReport() != null && (!info.getHealthReport().isEmpty())) {
+ nodeManagerEntity.setHealthReport(info.getHealthReport());
+ }
+ // TODO: Need to remove the manually mapping RUNNING -> running, LOST - > lost, UNHEALTHY -> unhealthy
+ if (info.getState() != null) {
+ final String state = info.getState().toLowerCase();
+ nodeManagerEntity.setStatus(state);
+ if (state.equals(TopologyConstants.NODE_MANAGER_RUNNING_STATUS)) {
+ ++runningNodeCount;
+ } else if (state.equals(TopologyConstants.NODE_MANAGER_LOST_STATUS)) {
+ ++lostNodeCount;
+ } else if (state.equals(TopologyConstants.NODE_MANAGER_UNHEALTHY_STATUS)) {
+ ++unhealthyNodeCount;
+ }
+ }
+ result.getSlaveNodes().add(nodeManagerEntity);
+ }
+ LOGGER.info("Running NMs: " + runningNodeCount + ", lost NMs: " + lostNodeCount + ", unhealthy NMs: " + unhealthyNodeCount);
+ final MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, extractMasterHost(url), timestamp);
+ resourceManagerEntity.setStatus(TopologyConstants.RESOURCE_MANAGER_ACTIVE_STATUS);
+ result.getMasterNodes().add(resourceManagerEntity);
+ double value = runningNodeCount * 1d / list.size();
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp));
+ } catch (RuntimeException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ throw new ServiceNotResponseException(e);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ // Do nothing
+ }
+ }
+ }
+ }
+
+ private String extractMasterHost(String url) {
+ Matcher matcher = TopologyConstants.HTTP_HOST_MATCH_PATTERN.matcher(url);
+ if (matcher.find()) {
+ return matcher.group(1);
+ }
+ return url;
+ }
+
+ private String extractRack(YarnNodeInfo info) {
+ if (info.getRack() == null) {
+ return null;
+ }
+ String value = info.getRack();
+ value = value.substring(value.lastIndexOf('/') + 1);
+ return value;
+ }
+
+ private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) {
+ MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity();
+ entity.setTimestamp(updateTime);
+ entity.setLastUpdateTime(updateTime);
+ Map<String, String> tags = new HashMap<String, String>();
+ entity.setTags(tags);
+ tags.put(SITE_TAG, site);
+ tags.put(ROLE_TAG, roleType);
+ tags.put(HOSTNAME_TAG, hostname);
+ String rack = rackResolver.resolve(hostname);
+ tags.put(RACK_TAG, rack);
+ return entity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
index afd3518..11d907b 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -1,200 +1,200 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.topology.storm;
-
-import static org.apache.eagle.topology.TopologyConstants.HOSTNAME_TAG;
-import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
-import static org.apache.eagle.topology.TopologyConstants.ROLE_TAG;
-import static org.apache.eagle.topology.TopologyConstants.SITE_TAG;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.EagleServiceClientException;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.apache.eagle.topology.TopologyCheckAppConfig;
-import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
-import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
-import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
-import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity;
-import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
-import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class TopologyDataPersistBolt extends BaseRichBolt {
-
- private TopologyCheckAppConfig config;
- private IEagleServiceClient client;
- private OutputCollector collector;
-
- private static final Logger LOG = LoggerFactory.getLogger(TopologyDataPersistBolt.class);
-
- public TopologyDataPersistBolt(TopologyCheckAppConfig config) {
- this.config = config;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig().getString("service.host"), this.config.getConfig().getInt("service.port"),
- this.config.getConfig().getString("service.username"), this.config.getConfig().getString("service.password")));
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- if (input == null) {
- return;
- }
- String serviceName = input.getStringByField(TopologyConstants.SERVICE_NAME_FIELD);
- TopologyEntityParserResult result = (TopologyEntityParserResult) input.getValueByField(TopologyConstants.TOPOLOGY_DATA_FIELD);
- Set<String> availableHostnames = new HashSet<String>();
- List<TopologyBaseAPIEntity> entitiesForDeletion = new ArrayList<>();
- List<TopologyBaseAPIEntity> entitiesToWrite = new ArrayList<>();
-
- filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getMasterNodes());
- filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getSlaveNodes());
-
- String query = String.format("%s[@site=\"%s\"]{*}", serviceName, this.config.dataExtractorConfig.site);
- try {
- GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send();
- if (response.isSuccess() && response.getObj() != null) {
- for (TopologyBaseAPIEntity entity : response.getObj()) {
- if (availableHostnames != null && availableHostnames.size() > 0 && !availableHostnames.contains(generateKey(entity))) {
- entitiesForDeletion.add(entity);
- }
- }
- }
- deleteEntities(entitiesForDeletion, serviceName);
- writeEntities(entitiesToWrite, serviceName);
- writeEntities(result.getMetrics(), serviceName);
- emitToKafkaBolt(result);
- this.collector.ack(input);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- this.collector.fail(input);
- }
- }
-
- private void filterEntitiesToWrite(List<TopologyBaseAPIEntity> entitiesToWrite, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entities) {
- for (TopologyBaseAPIEntity entity : entities) {
- availableHostnames.add(generateKey(entity));
- entitiesToWrite.add(entity);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("f1"));
- }
-
- private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) {
- try {
- GenericServiceAPIResponseEntity response = client.delete(entities);
- if (!response.isSuccess()) {
- LOG.error("Got exception from eagle service: " + response.getException());
- } else {
- LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName);
- }
- } catch (EagleServiceClientException e) {
- LOG.error(e.getMessage(), e);
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
- entities.clear();
- }
-
- private void writeEntities(List<? extends TaggedLogAPIEntity> entities, String serviceName) {
- try {
- GenericServiceAPIResponseEntity response = client.create(entities);
- if (!response.isSuccess()) {
- LOG.error("Got exception from eagle service: " + response.getException());
- } else {
- LOG.info("Successfully wrote {} entities for {}", entities.size(), serviceName);
- }
- } catch (Exception e) {
- LOG.error("cannot create entities successfully", e);
- }
- entities.clear();
- }
-
- private String generateKey(TopologyBaseAPIEntity entity) {
- return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG),
- entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
- entity.getTags().get(TopologyConstants.ROLE_TAG));
- }
-
- private void emitToKafkaBolt(TopologyEntityParserResult result) {
-
- List<HealthCheckParseAPIEntity> healthCheckParseAPIList = new ArrayList<HealthCheckParseAPIEntity>();
-
- setNodeInfo(result.getMasterNodes(), healthCheckParseAPIList);
-
- setNodeInfo(result.getSlaveNodes(), healthCheckParseAPIList);
-
- for (HealthCheckParseAPIEntity healthCheckAPIEntity : healthCheckParseAPIList) {
- this.collector.emit(new Values(healthCheckAPIEntity));
- }
-
- }
-
- private void setNodeInfo(List<TopologyBaseAPIEntity> topologyBaseAPIList, List<HealthCheckParseAPIEntity> healthCheckParseAPIList) {
- HealthCheckParseAPIEntity healthCheckAPIEntity = null;
- for (Iterator<TopologyBaseAPIEntity> iterator = topologyBaseAPIList.iterator(); iterator.hasNext(); ) {
-
- healthCheckAPIEntity = new HealthCheckParseAPIEntity();
- TopologyBaseAPIEntity topologyBaseAPIEntity = iterator.next();
-
- if (topologyBaseAPIEntity instanceof HBaseServiceTopologyAPIEntity) {
-
- healthCheckAPIEntity.setStatus(((HBaseServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
-
- }
- if (topologyBaseAPIEntity instanceof HdfsServiceTopologyAPIEntity) {
-
- healthCheckAPIEntity.setStatus(((HdfsServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
- }
-
- if (topologyBaseAPIEntity instanceof MRServiceTopologyAPIEntity) {
-
- healthCheckAPIEntity.setStatus(((MRServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
- }
-
- healthCheckAPIEntity.setTimeStamp(topologyBaseAPIEntity.getTimestamp());
- healthCheckAPIEntity.setHost(topologyBaseAPIEntity.getTags().get(HOSTNAME_TAG));
- healthCheckAPIEntity.setRole(topologyBaseAPIEntity.getTags().get(ROLE_TAG));
- healthCheckAPIEntity.setSite(topologyBaseAPIEntity.getTags().get(SITE_TAG));
- healthCheckParseAPIList.add(healthCheckAPIEntity);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.topology.storm;
+
+import static org.apache.eagle.topology.TopologyConstants.HOSTNAME_TAG;
+import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
+import static org.apache.eagle.topology.TopologyConstants.ROLE_TAG;
+import static org.apache.eagle.topology.TopologyConstants.SITE_TAG;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TopologyDataPersistBolt extends BaseRichBolt {
+
+ private TopologyCheckAppConfig config;
+ private IEagleServiceClient client;
+ private OutputCollector collector;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopologyDataPersistBolt.class);
+
+ public TopologyDataPersistBolt(TopologyCheckAppConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig().getString("service.host"), this.config.getConfig().getInt("service.port"),
+ this.config.getConfig().getString("service.username"), this.config.getConfig().getString("service.password")));
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (input == null) {
+ return;
+ }
+ String serviceName = input.getStringByField(TopologyConstants.SERVICE_NAME_FIELD);
+ TopologyEntityParserResult result = (TopologyEntityParserResult) input.getValueByField(TopologyConstants.TOPOLOGY_DATA_FIELD);
+ Set<String> availableHostnames = new HashSet<String>();
+ List<TopologyBaseAPIEntity> entitiesForDeletion = new ArrayList<>();
+ List<TopologyBaseAPIEntity> entitiesToWrite = new ArrayList<>();
+
+ filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getMasterNodes());
+ filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getSlaveNodes());
+
+ String query = String.format("%s[@site=\"%s\"]{*}", serviceName, this.config.dataExtractorConfig.site);
+ try {
+ GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send();
+ if (response.isSuccess() && response.getObj() != null) {
+ for (TopologyBaseAPIEntity entity : response.getObj()) {
+ if (availableHostnames != null && availableHostnames.size() > 0 && !availableHostnames.contains(generateKey(entity))) {
+ entitiesForDeletion.add(entity);
+ }
+ }
+ }
+ deleteEntities(entitiesForDeletion, serviceName);
+ writeEntities(entitiesToWrite, serviceName);
+ writeEntities(result.getMetrics(), serviceName);
+ emitToKafkaBolt(result);
+ this.collector.ack(input);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ this.collector.fail(input);
+ }
+ }
+
+ private void filterEntitiesToWrite(List<TopologyBaseAPIEntity> entitiesToWrite, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entities) {
+ for (TopologyBaseAPIEntity entity : entities) {
+ availableHostnames.add(generateKey(entity));
+ entitiesToWrite.add(entity);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("f1"));
+ }
+
+ private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) {
+ try {
+ GenericServiceAPIResponseEntity response = client.delete(entities);
+ if (!response.isSuccess()) {
+ LOG.error("Got exception from eagle service: " + response.getException());
+ } else {
+ LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName);
+ }
+ } catch (EagleServiceClientException e) {
+ LOG.error(e.getMessage(), e);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ entities.clear();
+ }
+
+ private void writeEntities(List<? extends TaggedLogAPIEntity> entities, String serviceName) {
+ try {
+ GenericServiceAPIResponseEntity response = client.create(entities);
+ if (!response.isSuccess()) {
+ LOG.error("Got exception from eagle service: " + response.getException());
+ } else {
+ LOG.info("Successfully wrote {} entities for {}", entities.size(), serviceName);
+ }
+ } catch (Exception e) {
+ LOG.error("cannot create entities successfully", e);
+ }
+ entities.clear();
+ }
+
+ private String generateKey(TopologyBaseAPIEntity entity) {
+ return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG),
+ entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
+ entity.getTags().get(TopologyConstants.ROLE_TAG));
+ }
+
+ private void emitToKafkaBolt(TopologyEntityParserResult result) {
+
+ List<HealthCheckParseAPIEntity> healthCheckParseAPIList = new ArrayList<HealthCheckParseAPIEntity>();
+
+ setNodeInfo(result.getMasterNodes(), healthCheckParseAPIList);
+
+ setNodeInfo(result.getSlaveNodes(), healthCheckParseAPIList);
+
+ for (HealthCheckParseAPIEntity healthCheckAPIEntity : healthCheckParseAPIList) {
+ this.collector.emit(new Values(healthCheckAPIEntity));
+ }
+
+ }
+
+ private void setNodeInfo(List<TopologyBaseAPIEntity> topologyBaseAPIList, List<HealthCheckParseAPIEntity> healthCheckParseAPIList) {
+ HealthCheckParseAPIEntity healthCheckAPIEntity = null;
+ for (Iterator<TopologyBaseAPIEntity> iterator = topologyBaseAPIList.iterator(); iterator.hasNext(); ) {
+
+ healthCheckAPIEntity = new HealthCheckParseAPIEntity();
+ TopologyBaseAPIEntity topologyBaseAPIEntity = iterator.next();
+
+ if (topologyBaseAPIEntity instanceof HBaseServiceTopologyAPIEntity) {
+
+ healthCheckAPIEntity.setStatus(((HBaseServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+
+ }
+ if (topologyBaseAPIEntity instanceof HdfsServiceTopologyAPIEntity) {
+
+ healthCheckAPIEntity.setStatus(((HdfsServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+ }
+
+ if (topologyBaseAPIEntity instanceof MRServiceTopologyAPIEntity) {
+
+ healthCheckAPIEntity.setStatus(((MRServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+ }
+
+ healthCheckAPIEntity.setTimeStamp(topologyBaseAPIEntity.getTimestamp());
+ healthCheckAPIEntity.setHost(topologyBaseAPIEntity.getTags().get(HOSTNAME_TAG));
+ healthCheckAPIEntity.setRole(topologyBaseAPIEntity.getTags().get(ROLE_TAG));
+ healthCheckAPIEntity.setSite(topologyBaseAPIEntity.getTags().get(SITE_TAG));
+ healthCheckParseAPIList.add(healthCheckAPIEntity);
+ }
+ }
+}