You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/05 10:07:57 UTC
incubator-eagle git commit: [EAGLE-819] Fix yarn node duplication in
topology health check app
Repository: incubator-eagle
Updated Branches:
refs/heads/master f899ca1c2 -> 0d1dcc408
[EAGLE-819] Fix yarn node duplication in topology health check app
https://issues.apache.org/jira/browse/EAGLE-819
Author: Zhao, Qingwen <qi...@apache.org>
Closes #712 from qingwen220/EAGLE-819.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0d1dcc40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0d1dcc40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0d1dcc40
Branch: refs/heads/master
Commit: 0d1dcc408ddf499a09e3868480494062e45cb736
Parents: f899ca1
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Mon Dec 5 18:07:50 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Mon Dec 5 18:07:50 2016 +0800
----------------------------------------------------------------------
...e.alert.app.AlertUnitTopologyAppProvider.xml | 2 +-
.../eagle/alert/engine/topology/TestBolt.java | 4 -
.../app/utils/connection/InputStreamUtils.java | 5 +-
.../connection/ServiceNotResponseException.java | 61 +++++++++
.../utils/connection/URLResourceFetcher.java | 62 ++++++++++
.../src/main/bin/createTables.sql | 6 +-
.../eagle/topology/TopologyCheckAppConfig.java | 2 +-
.../extractor/TopologyEntityParser.java | 2 +-
.../extractor/TopologyEntityParserResult.java | 4 +
.../extractor/hbase/HbaseTopologyCrawler.java | 13 +-
.../hbase/HbaseTopologyEntityParser.java | 29 +++--
.../extractor/hdfs/HdfsTopologyCrawler.java | 10 +-
.../hdfs/HdfsTopologyEntityParser.java | 64 +++++-----
.../extractor/mr/MRTopologyCrawler.java | 2 +-
.../extractor/mr/MRTopologyEntityParser.java | 124 +++++++++----------
.../topology/extractor/mr/YarnNodeInfo.java | 6 +-
.../impl/IPMaskTopologyRackResolver.java | 5 +-
.../topology/storm/TopologyDataPersistBolt.java | 53 ++++----
.../topology/utils/EntityBuilderHelper.java | 11 +-
.../eagle/topology/utils/JMXQueryHelper.java | 21 ++--
.../utils/ServiceNotResponseException.java | 61 ---------
.../eagle/topology/utils/StringUtils.java | 45 -------
.../eagle/topology/TopologyConstants.java | 1 +
.../entity/MRServiceTopologyAPIEntity.java | 11 ++
24 files changed, 314 insertions(+), 290 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
index 74e97d3..8ecbe8c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
@@ -90,7 +90,7 @@
</property>
<property>
<name>spout.stormKafkaUseSameZkQuorumWithKafkaBroker</name>
- <displayName>Spout Transaction Zookeeper to Reuse Broker Zookeeper</displayName>
+ <displayName>Reuse Broker Zookeeper</displayName>
<value>true</value>
<description>Use same zookeeper for kafka server and kafka consumer(Storm-Kafka)</description>
<required>false</required>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
index 1c375fa..dc9c9b3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
@@ -19,7 +19,6 @@
package org.apache.eagle.alert.engine.topology;
-
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
@@ -31,9 +30,6 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
-/**
- * Created by yonzhang on 4/7/16.
- */
@Ignore
@SuppressWarnings( {"rawtypes", "serial"})
public class TestBolt extends BaseRichBolt {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java
index 7b9479f..cf76627 100644
--- a/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java
+++ b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/InputStreamUtils.java
@@ -27,8 +27,8 @@ import java.util.zip.GZIPInputStream;
public class InputStreamUtils {
- private static final int CONNECTION_TIMEOUT = 10 * 1000;
- private static final int READ_TIMEOUT = 5 * 60 * 1000;
+ private static final int CONNECTION_TIMEOUT = 1 * 30 * 1000;
+ private static final int READ_TIMEOUT = 1 * 60 * 1000;
private static final String GZIP_HTTP_HEADER = "Accept-Encoding";
private static final String GZIP_COMPRESSION = "gzip";
@@ -49,7 +49,6 @@ public class InputStreamUtils {
if (null != auth) {
connection.setRequestProperty("Authorization", auth);
}
-
return connection.getInputStream();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/ServiceNotResponseException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/ServiceNotResponseException.java b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/ServiceNotResponseException.java
new file mode 100644
index 0000000..972b3e1
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/ServiceNotResponseException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.utils.connection;
+
+import java.io.IOException;
+
+public class ServiceNotResponseException extends IOException {
+
+ private static final long serialVersionUID = -2425311876734366496L;
+
+ /**
+ * Default constructor of FeederException.
+ */
+ public ServiceNotResponseException() {
+ super();
+ }
+
+ /**
+ * Constructor of FeederException.
+ *
+ * @param message error message
+ */
+ public ServiceNotResponseException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructor of FeederException.
+ *
+ * @param message error message
+ * @param cause the cause of the exception
+ */
+ public ServiceNotResponseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructor of FeederException.
+ *
+ * @param cause the cause of the exception
+ */
+ public ServiceNotResponseException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/URLResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/URLResourceFetcher.java b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/URLResourceFetcher.java
new file mode 100644
index 0000000..a93fded
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-utils/src/main/java/org/apache/eagle/app/utils/connection/URLResourceFetcher.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.utils.connection;
+
+import org.apache.eagle.app.utils.AppConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class URLResourceFetcher {
+
+ private static int MAX_RETRY_COUNT = 2;
+ private static final Logger LOG = LoggerFactory.getLogger(URLResourceFetcher.class);
+
+ public static InputStream openURLStream(String url) throws ServiceNotResponseException {
+ return openURLStream(url, AppConstants.CompressionType.NONE);
+ }
+
+ public static InputStream openURLStream(String url, AppConstants.CompressionType compressionType) throws ServiceNotResponseException {
+ InputStream is = null;
+ LOG.info("Going to query URL {}", url);
+ for (int i = 0; i < MAX_RETRY_COUNT; i++) {
+ try {
+ is = InputStreamUtils.getInputStream(url, null, compressionType);
+ } catch (Exception e) {
+ LOG.warn("fail to fetch data from {} due to {}, and try again", url, e.getMessage());
+ }
+ }
+ if (is == null) {
+ throw new ServiceNotResponseException(String.format("fail to fetch data from %s", url));
+ } else {
+ return is;
+ }
+ }
+
+ public static void closeInputStream(InputStream is) {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-server-assembly/src/main/bin/createTables.sql
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/bin/createTables.sql b/eagle-server-assembly/src/main/bin/createTables.sql
index 549d267..c5472b6 100644
--- a/eagle-server-assembly/src/main/bin/createTables.sql
+++ b/eagle-server-assembly/src/main/bin/createTables.sql
@@ -17,7 +17,7 @@
-- */
---- application framework metadata ---
+-- application framework metadata
CREATE TABLE IF NOT EXISTS applications (
uuid varchar(50) PRIMARY KEY,
@@ -43,7 +43,7 @@ CREATE TABLE IF NOT EXISTS sites (
UNIQUE (siteid)
);
---- eagle security module metadata ---
+-- eagle security module metadata
CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity (
site varchar(20) DEFAULT NULL,
@@ -65,7 +65,7 @@ CREATE TABLE IF NOT EXISTS hbase_sensitivity_entity (
primary key (site, hbase_resource)
);
---- alert engine metadata ---
+--- alert engine metadata
CREATE TABLE IF NOT EXISTS stream_cluster (
id VARCHAR (50) PRIMARY KEY,
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index a1c65a9..da6cd46 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -85,7 +85,7 @@ public class TopologyCheckAppConfig implements Serializable {
//e.printStackTrace();
}
- if (config.getBoolean("dataSourceConfig.hbase.enabled")) {
+ if (config.hasPath("dataSourceConfig.hbase") && config.getBoolean("dataSourceConfig.hbase.enabled")) {
topologyTypes.add(TopologyConstants.TopologyType.HBASE);
hBaseConfig = new HBaseConfig();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
index 64c9bd4..4677d0d 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java
@@ -27,7 +27,7 @@ public interface TopologyEntityParser {
* Parse hadoop topology and return the topology entity results.
* @return the topology entity result
*/
- public TopologyEntityParserResult parse(long timestamp) throws IOException;
+ public TopologyEntityParserResult parse(long timestamp);
/**
* Get topology type for the parser.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
index 1799054..b9f9481 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java
@@ -30,6 +30,10 @@ public class TopologyEntityParserResult {
private List<TopologyBaseAPIEntity> slaveNodes = new ArrayList<>();
private List<GenericMetricEntity> metrics = new ArrayList<>();
+ public TopologyEntityParserResult() {
+ version = TopologyConstants.HadoopVersion.V2;
+ }
+
public List<TopologyBaseAPIEntity> getSlaveNodes() {
return slaveNodes;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java
index 398178f..04a4aa1 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyCrawler.java
@@ -49,19 +49,12 @@ public class HbaseTopologyCrawler implements TopologyCrawler {
@Override
public void extract() {
long updateTimestamp = System.currentTimeMillis();
- TopologyEntityParserResult result = null;
- try {
- result = parser.parse(updateTimestamp);
- } catch (Exception e) {
- e.printStackTrace();
- }
- if (result == null) {
+ TopologyEntityParserResult result = parser.parse(updateTimestamp);;
+
+ if (result == null || result.getMetrics().isEmpty()) {
LOG.warn("No data fetched");
result = new TopologyEntityParserResult();
}
- if (result.getMasterNodes().isEmpty()) {
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, 0, site, updateTimestamp));
- }
TopologyCheckMessageId messageId = new TopologyCheckMessageId(TopologyConstants.TopologyType.HBASE, updateTimestamp);
this.collector.emit(new Values(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME, result), messageId);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
index 94b3727..ce977a9 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.slf4j.Logger;
import java.io.IOException;
import java.util.HashMap;
@@ -41,6 +42,7 @@ import static org.apache.eagle.topology.TopologyConstants.*;
public class HbaseTopologyEntityParser implements TopologyEntityParser {
+ private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HbaseTopologyEntityParser.class);
private Configuration hBaseConfiguration;
private String site;
private Boolean kerberosEnable = false;
@@ -72,18 +74,31 @@ public class HbaseTopologyEntityParser implements TopologyEntityParser {
return new HBaseAdmin(this.hBaseConfiguration);
}
+
@Override
- public TopologyEntityParserResult parse(long timestamp) throws IOException {
+ public TopologyEntityParserResult parse(long timestamp) {
+ final TopologyEntityParserResult result = new TopologyEntityParserResult();
+ int activeRatio = 0;
+ try {
+ doParse(timestamp, result);
+ activeRatio++;
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, activeRatio, site, timestamp));
+ return result;
+ }
+
+ private void doParse(long timestamp, TopologyEntityParserResult result) throws IOException {
long deadServers = 0;
long liveServers = 0;
- TopologyEntityParserResult result = new TopologyEntityParserResult();
HBaseAdmin admin = null;
try {
admin = getHBaseAdmin();
ClusterStatus status = admin.getClusterStatus();
deadServers = status.getDeadServers();
liveServers = status.getServersSize();
- result.setVersion(HadoopVersion.V2);
+
for (ServerName liveServer : status.getServers()) {
ServerLoad load = status.getLoad(liveServer);
result.getSlaveNodes().add(parseServer(liveServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_LIVE_STATUS, timestamp));
@@ -103,20 +118,16 @@ public class HbaseTopologyEntityParser implements TopologyEntityParser {
}
double liveRatio = liveServers * 1d / (liveServers + deadServers);
result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.REGIONSERVER_ROLE, liveRatio, site, timestamp));
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, 1d, site, timestamp));
- return result;
- } catch (RuntimeException e) {
- e.printStackTrace();
+ LOG.info("live servers: {}, dead servers: {}", liveServers, deadServers);
} finally {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
}
}
- return result;
}
private HBaseServiceTopologyAPIEntity parseServer(ServerName serverName, ServerLoad serverLoad, String role, String status, long timestamp) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java
index 7030221..f20204d 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyCrawler.java
@@ -46,14 +46,8 @@ public class HdfsTopologyCrawler implements TopologyCrawler {
@Override
public void extract() {
long updateTimestamp = System.currentTimeMillis();
- TopologyEntityParserResult result = null;
- try {
- result = parser.parse(updateTimestamp);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- if (result == null || result.getMasterNodes().isEmpty()) {
+ TopologyEntityParserResult result = parser.parse(updateTimestamp);
+ if (result == null || result.getMetrics().isEmpty()) {
LOG.warn("No data fetched");
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
index 79277a4..df6605d 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
@@ -19,6 +19,7 @@
package org.apache.eagle.topology.extractor.hdfs;
import org.apache.eagle.app.utils.PathResolverHelper;
+import org.apache.eagle.app.utils.connection.ServiceNotResponseException;
import org.apache.eagle.topology.TopologyCheckAppConfig;
import org.apache.eagle.topology.TopologyConstants;
import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
@@ -26,6 +27,7 @@ import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
import org.apache.eagle.topology.extractor.TopologyEntityParser;
import org.apache.eagle.topology.resolver.TopologyRackResolver;
import org.apache.eagle.topology.utils.*;
+import org.apache.commons.io.FileUtils;
import org.json.JSONArray;
import org.json.JSONException;
@@ -33,8 +35,6 @@ import org.json.JSONObject;
import org.json.JSONTokener;
import org.slf4j.Logger;
-import java.io.IOException;
-import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
@@ -80,8 +80,6 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)";
private static final String QJM_PATTERN = "([\\d\\.]+):\\d+";
- private static final double TB = 1024 * 1024 * 1024 * 1024;
-
public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) {
this.namenodeUrls = hdfsConfig.namenodeUrls;
this.site = site;
@@ -89,31 +87,31 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
}
@Override
- public TopologyEntityParserResult parse(long timestamp) throws IOException {
+ public TopologyEntityParserResult parse(long timestamp) {
final TopologyEntityParserResult result = new TopologyEntityParserResult();
- result.setVersion(TopologyConstants.HadoopVersion.V2);
- int numNamenode = 0;
+ int inActiveHosts = 0;
for (String url : namenodeUrls) {
try {
final HdfsServiceTopologyAPIEntity namenodeEntity = createNamenodeEntity(url, timestamp);
- result.getMasterNodes().add(namenodeEntity);
- numNamenode++;
if (namenodeEntity.getStatus().equalsIgnoreCase(NAME_NODE_ACTIVE_STATUS)) {
String namenodeVersion = createSlaveNodeEntities(url, timestamp, result);
namenodeEntity.setVersion(namenodeVersion);
}
- } catch (RuntimeException ex) {
- ex.printStackTrace();
- } catch (IOException e) {
- LOG.warn("Catch an IOException with url: {}", url);
+ result.getMasterNodes().add(namenodeEntity);
+ } catch (ServiceNotResponseException e) {
+ inActiveHosts++;
+ LOG.error(e.getMessage(), e);
+ } catch (Exception ex) {
+ LOG.error("fail to parse url {} due to {}, and will cancel this parsing", url, ex.getMessage(), ex);
+ result.getSlaveNodes().clear();
}
}
- double value = numNamenode * 1d / namenodeUrls.length;
+ double value = (namenodeUrls.length - inActiveHosts) * 1d / namenodeUrls.length;
result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NAME_NODE_ROLE, value, site, timestamp));
return result;
}
- private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws JSONException, IOException {
+ private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws ServiceNotResponseException {
final String urlString = buildFSNamesystemURL(url);
final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
final JMXBean bean = jmxBeanMap.get(JMX_FS_NAME_SYSTEM_BEAN_NAME);
@@ -121,21 +119,25 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
if (bean == null || bean.getPropertyMap() == null) {
throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!");
}
-
final String hostname = (String) bean.getPropertyMap().get(HA_NAME);
- HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
- final String state = (String) bean.getPropertyMap().get(HA_STATE);
- result.setStatus(state);
- final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
- result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024));
- final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB);
- result.setUsedCapacityTB(Double.toString(capacityUsedGB / 1024));
- final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL);
- result.setNumBlocks(Integer.toString(blocksTotal));
- return result;
+ HdfsServiceTopologyAPIEntity namenode = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
+ try {
+ final String state = (String) bean.getPropertyMap().get(HA_STATE);
+ namenode.setStatus(state);
+ final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
+ namenode.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / FileUtils.ONE_KB));
+ final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB);
+ namenode.setUsedCapacityTB(Double.toString(capacityUsedGB / FileUtils.ONE_KB));
+ final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL);
+ namenode.setNumBlocks(Integer.toString(blocksTotal));
+ } catch (RuntimeException ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ return namenode;
+
}
- private String createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws IOException {
+ private String createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws ServiceNotResponseException {
final String urlString = buildNamenodeInfo(url);
final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
final JMXBean bean = jmxBeanMap.get(JMX_NAMENODE_INFO);
@@ -147,7 +149,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
return (String) bean.getPropertyMap().get(NAME_NODE_VERSION);
}
- private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws UnknownHostException {
+ private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) {
if (bean.getPropertyMap().get(JN_TRANSACTION_INFO) == null || bean.getPropertyMap().get(JN_STATUS) == null) {
return;
}
@@ -194,7 +196,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.JOURNAL_NODE_ROLE, value, site, updateTime));
}
- private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException, IOException {
+ private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException {
int numLiveNodes = 0;
int numLiveDecommNodes = 0;
int numDeadNodes = 0;
@@ -229,9 +231,9 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser {
HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime);
final Number configuredCapacity = (Number) liveNode.get(DATA_NODE_CAPACITY);
- entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / TB));
+ entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / FileUtils.ONE_TB));
final Number capacityUsed = (Number) liveNode.get(DATA_NODE_USED_SPACE);
- entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / TB));
+ entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / FileUtils.ONE_TB));
final Number blocksTotal = (Number) liveNode.get(DATA_NODE_NUM_BLOCKS);
entity.setNumBlocks(Double.toString(blocksTotal.doubleValue()));
if (liveNode.has(DATA_NODE_FAILED_VOLUMN)) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
index af6bd51..e843a77 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
@@ -45,7 +45,7 @@ public class MRTopologyCrawler implements TopologyCrawler {
public void extract() {
long updateTimestamp = System.currentTimeMillis();
TopologyEntityParserResult result = parser.parse(updateTimestamp);
- if (result == null || result.getMasterNodes().isEmpty()) {
+ if (result == null || result.getMetrics().isEmpty()) {
LOG.warn("No data fetched");
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
index 447686c..c36e9e1 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
@@ -18,9 +18,7 @@
package org.apache.eagle.topology.extractor.mr;
-import org.apache.eagle.app.utils.AppConstants;
import org.apache.eagle.app.utils.PathResolverHelper;
-import org.apache.eagle.app.utils.connection.InputStreamUtils;
import org.apache.eagle.topology.TopologyCheckAppConfig;
import org.apache.eagle.topology.TopologyConstants;
import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
@@ -28,7 +26,8 @@ import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
import org.apache.eagle.topology.extractor.TopologyEntityParser;
import org.apache.eagle.topology.resolver.TopologyRackResolver;
import org.apache.eagle.topology.utils.EntityBuilderHelper;
-import org.apache.eagle.topology.utils.ServiceNotResponseException;
+import org.apache.eagle.app.utils.connection.ServiceNotResponseException;
+import org.apache.eagle.app.utils.connection.URLResourceFetcher;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
@@ -36,13 +35,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
-import java.net.ConnectException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import static org.apache.eagle.topology.TopologyConstants.*;
+import static org.apache.eagle.topology.utils.EntityBuilderHelper.generateKey;
public class MRTopologyEntityParser implements TopologyEntityParser {
@@ -81,18 +80,31 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
@Override
public TopologyEntityParserResult parse(long timestamp) {
final TopologyEntityParserResult result = new TopologyEntityParserResult();
- result.setVersion(TopologyConstants.HadoopVersion.V2);
+ String rmStatus;
+ int inActiveHosts = 0;
+ boolean isSuccess = false;
for (String url : rmUrls) {
+ MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE,
+ extractMasterHost(url), timestamp);
+ rmStatus = RESOURCE_MANAGER_ACTIVE_STATUS;
try {
- doParse(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL), timestamp, result);
- } catch (ServiceNotResponseException ex) {
- LOGGER.warn("Catch a ServiceNotResponseException with url: {}", url);
- // reSelect url
+ InputStream is = URLResourceFetcher.openURLStream(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL));
+ if (!isSuccess) {
+ isSuccess = doParse(timestamp, is, result);
+ }
+ } catch (IOException e) {
+ inActiveHosts++;
+ LOGGER.warn(e.getMessage(), e);
+ rmStatus = RESOURCE_MANAGER_INACTIVE_STATUS;
+ } catch (Exception ex) {
+ LOGGER.error("fail to parse url {} due to {}, and will cancel this parsing", url, ex.getMessage(), ex);
+ result.getSlaveNodes().clear();
}
+ resourceManagerEntity.setStatus(rmStatus);
+ result.getMasterNodes().add(resourceManagerEntity);
}
-
- double value = result.getMasterNodes().isEmpty() ? 0 : result.getMasterNodes().size() * 1d / rmUrls.length;
+ double value = (rmUrls.length - inActiveHosts) * 1.0 / rmUrls.length;
result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.RESOURCE_MANAGER_ROLE, value, site, timestamp));
doCheckHistoryServer(timestamp, result);
@@ -103,51 +115,41 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
if (historyServerUrl == null || historyServerUrl.isEmpty()) {
return;
}
- String hsUrl = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL);
- double liveCount = 1;
- try {
- InputStreamUtils.getInputStream(hsUrl, null, AppConstants.CompressionType.NONE);
- } catch (ConnectException e) {
- liveCount = 0;
- } catch (Exception e) {
- e.printStackTrace();
- return;
- }
- result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, liveCount, site, updateTime));
- }
-
- private InputStream getInputStream(String url, AppConstants.CompressionType type) throws ServiceNotResponseException {
+ String url = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL);
+ double activeHosts = 0;
InputStream is = null;
try {
- is = InputStreamUtils.getInputStream(url, null, type);
- } catch (ConnectException e) {
- throw new ServiceNotResponseException(e);
- } catch (Exception e) {
- e.printStackTrace();
+ is = URLResourceFetcher.openURLStream(url);
+ activeHosts++;
+ } catch (ServiceNotResponseException e) {
+ LOGGER.error(e.getMessage(), e);
+ } finally {
+ URLResourceFetcher.closeInputStream(is);
}
- return is;
+ result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, activeHosts, site, updateTime));
}
- private void doParse(String url, long timestamp, TopologyEntityParserResult result) throws ServiceNotResponseException {
-
- InputStream is = null;
+ private boolean doParse(long timestamp, InputStream is, TopologyEntityParserResult result) throws IOException {
+ boolean isSuccess = false;
+ String nodeKey;
+ Map<String, MRServiceTopologyAPIEntity> nmMap = new HashMap<>();
try {
- LOGGER.info("Going to query URL: " + url);
- is = InputStreamUtils.getInputStream(url, null, AppConstants.CompressionType.NONE);
YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class);
- if (nodeWrapper.getNodes() == null || nodeWrapper.getNodes().getNode() == null) {
- throw new ServiceNotResponseException("Invalid result of URL: " + url);
- }
int runningNodeCount = 0;
int lostNodeCount = 0;
int unhealthyNodeCount = 0;
+ int rackWarningCount = 0;
final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode();
for (YarnNodeInfo info : list) {
final MRServiceTopologyAPIEntity nodeManagerEntity = createEntity(NODE_MANAGER_ROLE, info.getNodeHostName(), timestamp);
+ if (!extractRack(info).equalsIgnoreCase(nodeManagerEntity.getTags().get(RACK_TAG)) && rackWarningCount < 10) {
+ LOGGER.warn("rack info is inconsistent, please configure the right rack resolver class");
+ rackWarningCount++;
+ }
+ nodeManagerEntity.setLastHealthUpdate(info.getLastHealthUpdate());
if (info.getHealthReport() != null && (!info.getHealthReport().isEmpty())) {
nodeManagerEntity.setHealthReport(info.getHealthReport());
}
- // TODO: Need to remove the manually mapping RUNNING -> running, LOST - > lost, UNHEALTHY -> unhealthy
if (info.getState() != null) {
final String state = info.getState().toLowerCase();
nodeManagerEntity.setStatus(state);
@@ -159,30 +161,23 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
++unhealthyNodeCount;
}
}
- result.getSlaveNodes().add(nodeManagerEntity);
+ nodeKey = generateKey(nodeManagerEntity);
+ if (nmMap.containsKey(nodeKey) && nmMap.get(nodeKey).getLastUpdateTime() < nodeManagerEntity.getLastHealthUpdate()) {
+ nmMap.put(nodeKey, nodeManagerEntity);
+ } else {
+ nmMap.put(nodeKey, nodeManagerEntity);
+ }
}
- LOGGER.info("Running NMs: " + runningNodeCount + ", lost NMs: " + lostNodeCount + ", unhealthy NMs: " + unhealthyNodeCount);
- final MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, extractMasterHost(url), timestamp);
- resourceManagerEntity.setStatus(TopologyConstants.RESOURCE_MANAGER_ACTIVE_STATUS);
- result.getMasterNodes().add(resourceManagerEntity);
- double value = runningNodeCount * 1d / list.size();
+ LOGGER.info("Total NMs: {}, Actual NMs: {}, Running NMs: {}, lost NMs: {}, unhealthy NMs: {}", list.size(), nmMap.size(), runningNodeCount, lostNodeCount, unhealthyNodeCount);
+
+ double value = runningNodeCount * 1d / nmMap.size();
result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp));
- } catch (RuntimeException e) {
- e.printStackTrace();
- } catch (IOException e) {
- throw new ServiceNotResponseException(e);
- } catch (Exception e) {
- e.printStackTrace();
+ result.getSlaveNodes().addAll(nmMap.values());
+ isSuccess = true;
} finally {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- e.printStackTrace();
- // Do nothing
- }
- }
+ URLResourceFetcher.closeInputStream(is);
}
+ return isSuccess;
}
private String extractMasterHost(String url) {
@@ -194,14 +189,15 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
}
private String extractRack(YarnNodeInfo info) {
- if (info.getRack() == null) {
- return null;
+ if (info.getRack() == null) { // if a host is DECOMMISSIONED, then no rack info
+ return rackResolver.resolve(info.getNodeHostName());
}
String value = info.getRack();
value = value.substring(value.lastIndexOf('/') + 1);
return value;
}
+
private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) {
MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity();
entity.setTimestamp(updateTime);
@@ -211,8 +207,8 @@ public class MRTopologyEntityParser implements TopologyEntityParser {
tags.put(SITE_TAG, site);
tags.put(ROLE_TAG, roleType);
tags.put(HOSTNAME_TAG, hostname);
- String rack = rackResolver.resolve(hostname);
- tags.put(RACK_TAG, rack);
+ String resolvedRack = rackResolver.resolve(hostname);
+ tags.put(RACK_TAG, resolvedRack);
return entity;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
index 4ca2b06..d3664e2 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
@@ -30,7 +30,7 @@ public class YarnNodeInfo {
private String id;
private String nodeHostName;
private String nodeHTTPAddress;
- private String lastHealthUpdate;
+ private long lastHealthUpdate;
private String healthReport;
private String numContainers;
private String usedMemoryMB;
@@ -77,11 +77,11 @@ public class YarnNodeInfo {
this.nodeHTTPAddress = nodeHTTPAddress;
}
- public String getLastHealthUpdate() {
+ public long getLastHealthUpdate() {
return lastHealthUpdate;
}
- public void setLastHealthUpdate(String lastHealthUpdate) {
+ public void setLastHealthUpdate(long lastHealthUpdate) {
this.lastHealthUpdate = lastHealthUpdate;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
index 821de3c..99a44a6 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
@@ -19,6 +19,7 @@
package org.apache.eagle.topology.resolver.impl;
import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.slf4j.Logger;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -28,6 +29,8 @@ import java.net.UnknownHostException;
*/
public class IPMaskTopologyRackResolver implements TopologyRackResolver {
+ private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(IPMaskTopologyRackResolver.class);
+
private final int pos = 2;
private int rackPos;
@@ -46,7 +49,7 @@ public class IPMaskTopologyRackResolver implements TopologyRackResolver {
InetAddress address = InetAddress.getByName(hostname);
result = "rack" + (int) (address.getAddress()[rackPos] & 0xff);
} catch (UnknownHostException e) {
- //e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
index 11d907b..627ebe3 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -32,10 +32,10 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.EagleServiceClientException;
-import org.apache.eagle.service.client.EagleServiceConnector;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.apache.eagle.topology.TopologyCheckAppConfig;
@@ -49,7 +49,6 @@ import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.*;
public class TopologyDataPersistBolt extends BaseRichBolt {
@@ -66,8 +65,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig().getString("service.host"), this.config.getConfig().getInt("service.port"),
- this.config.getConfig().getString("service.username"), this.config.getConfig().getString("service.password")));
+ this.client = new EagleServiceClientImpl(config.getConfig());
this.collector = collector;
}
@@ -82,22 +80,20 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
List<TopologyBaseAPIEntity> entitiesForDeletion = new ArrayList<>();
List<TopologyBaseAPIEntity> entitiesToWrite = new ArrayList<>();
- filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getMasterNodes());
- filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getSlaveNodes());
+ filterEntitiesToWrite(result, availableHostnames, entitiesToWrite);
String query = String.format("%s[@site=\"%s\"]{*}", serviceName, this.config.dataExtractorConfig.site);
try {
GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send();
if (response.isSuccess() && response.getObj() != null) {
for (TopologyBaseAPIEntity entity : response.getObj()) {
- if (availableHostnames != null && availableHostnames.size() > 0 && !availableHostnames.contains(generateKey(entity))) {
+ if (result.getSlaveNodes().size() > 0 && !availableHostnames.contains(generateKey(entity))) {
entitiesForDeletion.add(entity);
}
}
}
deleteEntities(entitiesForDeletion, serviceName);
- writeEntities(entitiesToWrite, serviceName);
- writeEntities(result.getMetrics(), serviceName);
+ writeEntities(entitiesToWrite, result.getMetrics(), serviceName);
emitToKafkaBolt(result);
this.collector.ack(input);
} catch (Exception e) {
@@ -106,8 +102,12 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
}
}
- private void filterEntitiesToWrite(List<TopologyBaseAPIEntity> entitiesToWrite, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entities) {
- for (TopologyBaseAPIEntity entity : entities) {
+ private void filterEntitiesToWrite(TopologyEntityParserResult result, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entitiesToWrite) {
+ for (TopologyBaseAPIEntity entity : result.getMasterNodes()) {
+ availableHostnames.add(generateKey(entity));
+ entitiesToWrite.add(entity);
+ }
+ for (TopologyBaseAPIEntity entity : result.getSlaveNodes()) {
availableHostnames.add(generateKey(entity));
entitiesToWrite.add(entity);
}
@@ -126,15 +126,13 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
} else {
LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName);
}
- } catch (EagleServiceClientException e) {
- LOG.error(e.getMessage(), e);
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error(e.getMessage(), e);
}
entities.clear();
}
- private void writeEntities(List<? extends TaggedLogAPIEntity> entities, String serviceName) {
+ private void writeEntities(List<? extends TaggedLogAPIEntity> entities, List<GenericMetricEntity> metrics, String serviceName) {
try {
GenericServiceAPIResponseEntity response = client.create(entities);
if (!response.isSuccess()) {
@@ -142,6 +140,12 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
} else {
LOG.info("Successfully wrote {} entities for {}", entities.size(), serviceName);
}
+ response = client.create(metrics);
+ if (!response.isSuccess()) {
+ LOG.error("Got exception from eagle service: " + response.getException());
+ } else {
+ LOG.info("Successfully wrote {} metrics for {}", entities.size(), serviceName);
+ }
} catch (Exception e) {
LOG.error("cannot create entities successfully", e);
}
@@ -149,23 +153,19 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
}
private String generateKey(TopologyBaseAPIEntity entity) {
- return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG),
- entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
- entity.getTags().get(TopologyConstants.ROLE_TAG));
+ return new HashCodeBuilder().append(entity.getTags().get(TopologyConstants.SITE_TAG))
+ .append(entity.getTags().get(TopologyConstants.HOSTNAME_TAG))
+ .append(entity.getTags().get(TopologyConstants.ROLE_TAG))
+ .build().toString();
}
private void emitToKafkaBolt(TopologyEntityParserResult result) {
-
List<HealthCheckParseAPIEntity> healthCheckParseAPIList = new ArrayList<HealthCheckParseAPIEntity>();
-
setNodeInfo(result.getMasterNodes(), healthCheckParseAPIList);
-
setNodeInfo(result.getSlaveNodes(), healthCheckParseAPIList);
-
for (HealthCheckParseAPIEntity healthCheckAPIEntity : healthCheckParseAPIList) {
this.collector.emit(new Values(healthCheckAPIEntity));
}
-
}
private void setNodeInfo(List<TopologyBaseAPIEntity> topologyBaseAPIList, List<HealthCheckParseAPIEntity> healthCheckParseAPIList) {
@@ -176,17 +176,12 @@ public class TopologyDataPersistBolt extends BaseRichBolt {
TopologyBaseAPIEntity topologyBaseAPIEntity = iterator.next();
if (topologyBaseAPIEntity instanceof HBaseServiceTopologyAPIEntity) {
-
healthCheckAPIEntity.setStatus(((HBaseServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
-
}
if (topologyBaseAPIEntity instanceof HdfsServiceTopologyAPIEntity) {
-
healthCheckAPIEntity.setStatus(((HdfsServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
}
-
if (topologyBaseAPIEntity instanceof MRServiceTopologyAPIEntity) {
-
healthCheckAPIEntity.setStatus(((MRServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
index f390fa8..8aa1d88 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
@@ -19,8 +19,10 @@
package org.apache.eagle.topology.utils;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -39,7 +41,7 @@ public class EntityBuilderHelper {
return addr.getHostName();
}
- public static GenericMetricEntity metricWrapper(Long timestamp, String metricName, double value, Map<String, String> tags) {
+ private static GenericMetricEntity metricWrapper(Long timestamp, String metricName, double value, Map<String, String> tags) {
GenericMetricEntity metricEntity = new GenericMetricEntity();
metricEntity.setTimestamp(timestamp);
metricEntity.setTags(tags);
@@ -63,4 +65,11 @@ public class EntityBuilderHelper {
return key.indexOf(TopologyConstants.COLON) > 0 ? key.substring(0, key.indexOf(TopologyConstants.COLON)) : key;
}
+ public static String generateKey(TopologyBaseAPIEntity entity) {
+ return new HashCodeBuilder().append(entity.getTags().get(TopologyConstants.SITE_TAG))
+ .append(entity.getTags().get(TopologyConstants.HOSTNAME_TAG))
+ .append(entity.getTags().get(TopologyConstants.ROLE_TAG))
+ .build().toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
index bca8485..860a1b8 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
@@ -18,8 +18,10 @@
package org.apache.eagle.topology.utils;
+import org.apache.eagle.app.utils.connection.ServiceNotResponseException;
import org.apache.eagle.app.utils.connection.URLConnectionUtils;
+import org.apache.eagle.app.utils.connection.URLResourceFetcher;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -38,28 +40,19 @@ import java.util.Map;
*/
public final class JMXQueryHelper {
- private static final int DEFAULT_QUERY_TIMEOUT = 30 * 60 * 1000;
- private static final Logger LOG = LoggerFactory.getLogger(JMXQueryHelper.class);
-
- public static Map<String, JMXBean> query(String jmxQueryUrl) throws JSONException, IOException {
- LOG.info("Going to query JMX url: " + jmxQueryUrl);
+ public static Map<String, JMXBean> query(String jmxQueryUrl) throws ServiceNotResponseException {
InputStream is = null;
try {
- final URLConnection connection = URLConnectionUtils.getConnection(jmxQueryUrl);
- connection.setReadTimeout(DEFAULT_QUERY_TIMEOUT);
- is = connection.getInputStream();
+ is = URLResourceFetcher.openURLStream(jmxQueryUrl);
return parseStream(is);
} catch (Exception e) {
- e.printStackTrace();
- return null;
+ throw new ServiceNotResponseException(e);
} finally {
- if (is != null) {
- is.close();
- }
+ URLResourceFetcher.closeInputStream(is);
}
}
- public static Map<String, JMXBean> parseStream(InputStream is) {
+ private static Map<String, JMXBean> parseStream(InputStream is) {
final Map<String, JMXBean> resultMap = new HashMap<String, JMXBean>();
final JSONTokener tokener = new JSONTokener(is);
final JSONObject jsonBeansObject = new JSONObject(tokener);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
deleted file mode 100644
index 3204ca3..0000000
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.topology.utils;
-
-import java.io.IOException;
-
-public class ServiceNotResponseException extends IOException {
-
- private static final long serialVersionUID = -2425311876734366496L;
-
- /**
- * Default constructor of FeederException.
- */
- public ServiceNotResponseException() {
- super();
- }
-
- /**
- * Constructor of FeederException.
- *
- * @param message error message
- */
- public ServiceNotResponseException(String message) {
- super(message);
- }
-
- /**
- * Constructor of FeederException.
- *
- * @param message error message
- * @param cause the cause of the exception
- */
- public ServiceNotResponseException(String message, Throwable cause) {
- super(message, cause);
- }
-
- /**
- * Constructor of FeederException.
- *
- * @param cause the cause of the exception
- */
- public ServiceNotResponseException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java
deleted file mode 100644
index 3b47f84..0000000
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.topology.utils;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public final class StringUtils {
-
- public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
-
- private StringUtils() {
-
- }
-
- public static String convertMapToString(Map<String, String> tags) {
- StringBuilder tagBuilder = new StringBuilder();
- Iterator<Entry<String, String>> iter = tags.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, String> entry = (Map.Entry<String, String>) iter.next();
- tagBuilder.append(entry.getKey() + ":" + entry.getValue());
- if (iter.hasNext()) {
- tagBuilder.append(",");
- }
- }
- return tagBuilder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
index 3e535a8..488d3f9 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/TopologyConstants.java
@@ -61,6 +61,7 @@ public class TopologyConstants {
// Status definitions for resource manager
public static final String RESOURCE_MANAGER_ACTIVE_STATUS = "active";
+ public static final String RESOURCE_MANAGER_INACTIVE_STATUS = "inactive";
// Status definitions for node manager
public static final String NODE_MANAGER_RUNNING_STATUS = "running";
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0d1dcc40/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
index 82e57fd..006c898 100644
--- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
+++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java
@@ -38,8 +38,19 @@ public class MRServiceTopologyAPIEntity extends TopologyBaseAPIEntity {
@Column("d")
private String healthReport;
@Column("e")
+ private long lastHealthUpdate;
+ @Column("f")
private long lastUpdateTime;
+ public long getLastHealthUpdate() {
+ return lastHealthUpdate;
+ }
+
+ public void setLastHealthUpdate(long lastHealthUpdate) {
+ this.lastHealthUpdate = lastHealthUpdate;
+ valueChanged("lastHealthUpdate");
+ }
+
public long getLastUpdateTime() {
return lastUpdateTime;
}