You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/29 07:06:12 UTC
eagle git commit: [MINOR] update SparkHistoryJobApp &
TopologyCheckApp configs
Repository: eagle
Updated Branches:
refs/heads/master 77fbff720 -> ab50e62ac
[MINOR] update SparkHistoryJobApp & TopologyCheckApp configs
1. add `service.flushLimit` in SparkHistoryJobApp config
2. update TopologyCheckAppConfig.java
Author: Zhao, Qingwen <qi...@apache.org>
Closes #761 from qingwen220/minor.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/ab50e62a
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/ab50e62a
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/ab50e62a
Branch: refs/heads/master
Commit: ab50e62acb2483dd009d87b2d68814e13e3e3d92
Parents: 77fbff7
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Thu Dec 29 15:05:57 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Dec 29 15:05:57 2016 +0800
----------------------------------------------------------------------
.../spark/history/SparkHistoryJobAppConfig.java | 5 ++
.../history/crawl/JHFSparkEventReader.java | 2 +-
...spark.history.SparkHistoryJobAppProvider.xml | 6 +++
.../src/main/resources/application.conf | 1 +
.../apache/eagle/topology/TopologyCheckApp.java | 11 ++--
.../eagle/topology/TopologyCheckAppConfig.java | 20 ++++----
....eagle.topology.TopologyCheckAppProvider.xml | 54 ++++++++++----------
.../src/main/resources/application.conf | 6 +--
.../src/test/resources/application.conf | 10 ++--
9 files changed, 64 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index adde60b..9646fb1 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -87,6 +87,10 @@ public class SparkHistoryJobAppConfig implements Serializable {
if (config.hasPath("service.basePath")) {
this.eagleInfo.basePath = config.getString("service.basePath");
}
+ this.eagleInfo.flushLimit = 500;
+ if (config.hasPath("service.flushLimit")) {
+ this.eagleInfo.flushLimit = config.getInt("service.flushLimit");
+ }
this.stormConfig.siteId = config.getString("siteId");
this.stormConfig.spoutCrawlInterval = config.getInt("topology.spoutCrawlInterval");
@@ -126,5 +130,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
public String password;
public String basePath;
public int timeout;
+ public int flushLimit;
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
index 82e8a41..05e35e4 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -690,7 +690,7 @@ public class JHFSparkEventReader {
private void flushEntities(Collection entities, boolean forceFlush) {
this.createEntities.addAll(entities);
- if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
+ if (forceFlush || this.createEntities.size() >= config.eagleInfo.flushLimit) {
try {
this.doFlush(this.createEntities);
this.createEntities.clear();
http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index 17a3a4a..8c0d472 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -65,6 +65,12 @@
<description>default timeout is 30s</description>
<value>300</value>
</property>
+ <property>
+ <name>service.flushLimit</name>
+ <displayName>service flushing limit</displayName>
+ <description>flushing entities limit</description>
+ <value>500</value>
+ </property>
<!-- datasource config -->
<property>
http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 2839915..62f66f6 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -28,6 +28,7 @@
password : "secret",
basePath : "/rest",
readTimeOutSeconds : 2
+ flushLimit: 500
},
"zookeeper" : {
http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
index ba5914b..87ff27a 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
@@ -30,7 +30,6 @@ import org.apache.eagle.topology.storm.TopologyDataPersistBolt;
public class TopologyCheckApp extends StormApplication {
- private static final String SINK_TASK_NUM = "topology.numOfSinkTasks";
private static final String TOPOLOGY_HEALTH_CHECK_STREAM = "topology_health_check_stream";
@Override
@@ -41,7 +40,6 @@ public class TopologyCheckApp extends StormApplication {
String persistBoltName = TopologyCheckAppConfig.TOPOLOGY_ENTITY_PERSIST_BOLT_NAME;
String parseBoltName = TopologyCheckAppConfig.PARSE_BOLT_NAME;
String kafkaSinkBoltName = TopologyCheckAppConfig.SINK_BOLT_NAME;
- int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout(
@@ -59,10 +57,15 @@ public class TopologyCheckApp extends StormApplication {
topologyBuilder.setBolt(
parseBoltName,
new HealthCheckParseBolt(),
- topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(persistBoltName);
+ topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt
+ ).setNumTasks(topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(persistBoltName);
StormStreamSink<?> sinkBolt = environment.getStreamSink(TOPOLOGY_HEALTH_CHECK_STREAM, config);
- topologyBuilder.setBolt(kafkaSinkBoltName, sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks).shuffleGrouping(parseBoltName);
+ topologyBuilder.setBolt(
+ kafkaSinkBoltName,
+ sinkBolt,
+ topologyCheckAppConfig.dataExtractorConfig.numKafkaSinkBolt
+ ).setNumTasks(topologyCheckAppConfig.dataExtractorConfig.numKafkaSinkBolt).shuffleGrouping(parseBoltName);
return topologyBuilder.createTopology();
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index f6d61f6..90a3773 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -69,14 +69,15 @@ public class TopologyCheckAppConfig implements Serializable {
this.config = config;
this.dataExtractorConfig.site = config.getString("siteId");
- this.dataExtractorConfig.fetchDataIntervalInSecs = config.getLong("dataExtractorConfig.fetchDataIntervalInSecs");
+ this.dataExtractorConfig.fetchDataIntervalInSecs = config.getLong("topology.fetchDataIntervalInSecs");
this.dataExtractorConfig.parseThreadPoolSize = MAX_NUM_THREADS;
- if (config.hasPath("dataExtractorConfig.parseThreadPoolSize")) {
- this.dataExtractorConfig.parseThreadPoolSize = config.getInt("dataExtractorConfig.parseThreadPoolSize");
+ if (config.hasPath("topology.parseThreadPoolSize")) {
+ this.dataExtractorConfig.parseThreadPoolSize = config.getInt("topology.parseThreadPoolSize");
}
- this.dataExtractorConfig.numDataFetcherSpout = config.getInt("dataExtractorConfig.numDataFetcherSpout");
- this.dataExtractorConfig.numEntityPersistBolt = config.getInt("dataExtractorConfig.numEntityPersistBolt");
- String resolveCls = config.getString("dataExtractorConfig.rackResolverCls");
+ this.dataExtractorConfig.numDataFetcherSpout = config.getInt("topology.numDataFetcherSpout");
+ this.dataExtractorConfig.numEntityPersistBolt = config.getInt("topology.numEntityPersistBolt");
+ this.dataExtractorConfig.numKafkaSinkBolt = config.getInt("topology.numOfKafkaSinkBolt");
+ String resolveCls = config.getString("topology.rackResolverCls");
try {
this.dataExtractorConfig.resolverCls = (Class<? extends TopologyRackResolver>) Class.forName(resolveCls);
} catch (ClassNotFoundException e) {
@@ -85,7 +86,7 @@ public class TopologyCheckAppConfig implements Serializable {
//e.printStackTrace();
}
- if (config.hasPath("dataSourceConfig.hbase") && config.getBoolean("dataSourceConfig.hbase.enabled")) {
+ if (config.hasPath("dataSourceConfig.hbase.enabled") && config.getBoolean("dataSourceConfig.hbase.enabled")) {
topologyTypes.add(TopologyConstants.TopologyType.HBASE);
hBaseConfig = new HBaseConfig();
@@ -98,14 +99,14 @@ public class TopologyCheckAppConfig implements Serializable {
hBaseConfig.hbaseMasterPrincipal = getOptionalConfig("dataSourceConfig.hbase.kerberos.master.principal", null);
}
- if (config.hasPath("dataSourceConfig.mr") && config.getBoolean("dataSourceConfig.mr.enabled")) {
+ if (config.hasPath("dataSourceConfig.mr.enabled") && config.getBoolean("dataSourceConfig.mr.enabled")) {
topologyTypes.add(TopologyConstants.TopologyType.MR);
mrConfig = new MRConfig();
mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*");
mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null);
}
- if (config.hasPath("dataSourceConfig.hdfs") && config.getBoolean("dataSourceConfig.hdfs.enabled")) {
+ if (config.hasPath("dataSourceConfig.hdfs.enabled") && config.getBoolean("dataSourceConfig.hdfs.enabled")) {
topologyTypes.add(TopologyConstants.TopologyType.HDFS);
hdfsConfig = new HdfsConfig();
hdfsConfig.namenodeUrls = config.getString("dataSourceConfig.hdfs.namenodeUrl").split(",\\s*");
@@ -116,6 +117,7 @@ public class TopologyCheckAppConfig implements Serializable {
public String site;
public int numDataFetcherSpout;
public int numEntityPersistBolt;
+ public int numKafkaSinkBolt;
public long fetchDataIntervalInSecs;
public int parseThreadPoolSize;
public Class<? extends TopologyRackResolver> resolverCls;
http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index 2089a2f..b4e3695 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -24,38 +24,51 @@
<configuration>
<!-- org.apache.eagle.topology.TopologyCheckApp -->
<property>
- <name>dataExtractorConfig.fetchDataIntervalInSecs</name>
+ <name>topology.fetchDataIntervalInSecs</name>
<displayName>Fetch Data Interval in Secs</displayName>
<description>fetch data interval in secs</description>
<value>300</value>
</property>
<property>
- <name>dataExtractorConfig.parseThreadPoolSize</name>
+ <name>topology.parseThreadPoolSize</name>
<displayName>Parser Thread Pool Size</displayName>
<description>parser thread pool size</description>
<value>5</value>
</property>
<property>
- <name>dataExtractorConfig.numDataFetcherSpout</name>
+ <name>topology.numDataFetcherSpout</name>
<displayName>Spout Task Number</displayName>
<description>spout task number</description>
<value>1</value>
</property>
<property>
- <name>dataExtractorConfig.numEntityPersistBolt</name>
- <displayName>Bolt Task Number</displayName>
- <description>bolt task number</description>
+ <name>topology.numEntityPersistBolt</name>
+ <displayName>Storage Bolt Task Number</displayName>
+ <description>number of persist tasks to the storage</description>
<value>1</value>
</property>
+ <property>
+ <name>topology.numOfKafkaSinkBolt</name>
+ <displayName>Kafka Sink Task Number</displayName>
+ <value>2</value>
+ <description>number of sinks to alert engine</description>
+ </property>
<property>
- <name>dataExtractorConfig.rackResolverCls</name>
+ <name>topology.rackResolverCls</name>
<displayName>Rack Resolver Class</displayName>
<description>rack resolver class</description>
<value>org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver</value>
</property>
<property>
+ <name>dataSourceConfig.hbase.enabled</name>
+ <displayName>HBase Topology Check Enabled</displayName>
+ <description>HBase topology status check enabled</description>
+ <value>false</value>
+ <required>true</required>
+ </property>
+ <property>
<name>dataSourceConfig.hbase.zkQuorum</name>
<displayName>HBase Zookeeper Quorum</displayName>
<description>hbase zookeeper quorum (optional)</description>
@@ -71,13 +84,6 @@
<description>hbase zookeeper client port (optional)</description>
</property>
-
- <property>
- <name>dataSourceConfig.hdfs.namenodeUrl</name>
- <displayName>Hdfs Namenode Web URL</displayName>
- <description>hdfs namenode web url for HDFS monitor</description>
- <value>http://sandbox.hortonworks.com:50070</value>
- </property>
<property>
<name>dataSourceConfig.hdfs.enabled</name>
<displayName>HDFS Topology Check Enabled</displayName>
@@ -86,6 +92,13 @@
<required>true</required>
</property>
<property>
+ <name>dataSourceConfig.hdfs.namenodeUrl</name>
+ <displayName>Hdfs Namenode Web URL</displayName>
+ <description>hdfs namenode web url for HDFS monitor</description>
+ <value>http://sandbox.hortonworks.com:50070</value>
+ </property>
+
+ <property>
<name>dataSourceConfig.mr.enabled</name>
<displayName>MR Topology Check Enabled</displayName>
<description>MR topology status check enabled</description>
@@ -104,20 +117,7 @@
<description>URL for history server monitor (optional)</description>
<value></value>
</property>
- <property>
- <name>dataSourceConfig.hbase.enabled</name>
- <displayName>HBase Topology Check Enabled</displayName>
- <description>HBase topology status check enabled</description>
- <value>false</value>
- <required>true</required>
- </property>
- <property>
- <name>topology.numOfSinkTasks</name>
- <displayName>topology.numOfSinkTasks</displayName>
- <value>2</value>
- <description>number of sink tasks</description>
- </property>
<!-- data sink configurations -->
<property>
http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
index 5d5f1d3..180e57e 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
@@ -20,15 +20,11 @@
workers : 1,
topology : {
- "numOfSinkTasks" : 2
- }
-
- dataExtractorConfig : {
- "site": "sandbox",
"fetchDataIntervalInSecs": 300,
"parseThreadPoolSize": 5,
"numDataFetcherSpout" : 1,
"numEntityPersistBolt" : 1,
+ "numOfKafkaSinkBolt": 2,
"rackResolverCls" : "org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver"
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
index da52f65..c6f17ae 100644
--- a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
@@ -18,13 +18,13 @@
mode : "LOCAL",
workers : 1,
- dataExtractorConfig : {
- "site": "sandbox",
- "fetchDataIntervalInSecs": 15,
+ topology : {
+ "fetchDataIntervalInSecs": 300,
"parseThreadPoolSize": 5,
- "checkRetryTime" : 3,
"numDataFetcherSpout" : 1,
- "numEntityPersistBolt" : 1
+ "numEntityPersistBolt" : 1,
+ "numOfKafkaSinkBolt": 2,
+ "rackResolverCls" : "org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver"
}
dataSourceConfig : {