You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/29 07:06:12 UTC

eagle git commit: [MINOR] update SparkHistoryJobApp & TopologyCheckApp configs

Repository: eagle
Updated Branches:
  refs/heads/master 77fbff720 -> ab50e62ac


[MINOR] update SparkHistoryJobApp & TopologyCheckApp configs

1. add `service.flushLimit` in SparkHistoryJobApp config
2. update TopologyCheckAppConfig.java

Author: Zhao, Qingwen <qi...@apache.org>

Closes #761 from qingwen220/minor.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/ab50e62a
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/ab50e62a
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/ab50e62a

Branch: refs/heads/master
Commit: ab50e62acb2483dd009d87b2d68814e13e3e3d92
Parents: 77fbff7
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Thu Dec 29 15:05:57 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Dec 29 15:05:57 2016 +0800

----------------------------------------------------------------------
 .../spark/history/SparkHistoryJobAppConfig.java |  5 ++
 .../history/crawl/JHFSparkEventReader.java      |  2 +-
 ...spark.history.SparkHistoryJobAppProvider.xml |  6 +++
 .../src/main/resources/application.conf         |  1 +
 .../apache/eagle/topology/TopologyCheckApp.java | 11 ++--
 .../eagle/topology/TopologyCheckAppConfig.java  | 20 ++++----
 ....eagle.topology.TopologyCheckAppProvider.xml | 54 ++++++++++----------
 .../src/main/resources/application.conf         |  6 +--
 .../src/test/resources/application.conf         | 10 ++--
 9 files changed, 64 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index adde60b..9646fb1 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -87,6 +87,10 @@ public class SparkHistoryJobAppConfig implements Serializable {
         if (config.hasPath("service.basePath")) {
             this.eagleInfo.basePath = config.getString("service.basePath");
         }
+        this.eagleInfo.flushLimit = 500;
+        if (config.hasPath("service.flushLimit")) {
+            this.eagleInfo.flushLimit = config.getInt("service.flushLimit");
+        }
 
         this.stormConfig.siteId = config.getString("siteId");
         this.stormConfig.spoutCrawlInterval = config.getInt("topology.spoutCrawlInterval");
@@ -126,5 +130,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
         public String password;
         public String basePath;
         public int timeout;
+        public int flushLimit;
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
index 82e8a41..05e35e4 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -690,7 +690,7 @@ public class JHFSparkEventReader {
     private void flushEntities(Collection entities, boolean forceFlush) {
         this.createEntities.addAll(entities);
 
-        if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
+        if (forceFlush || this.createEntities.size() >= config.eagleInfo.flushLimit) {
             try {
                 this.doFlush(this.createEntities);
                 this.createEntities.clear();

http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index 17a3a4a..8c0d472 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -65,6 +65,12 @@
             <description>default timeout is 30s</description>
             <value>300</value>
         </property>
+        <property>
+            <name>service.flushLimit</name>
+            <displayName>service flushing limit</displayName>
+            <description>flushing entities limit</description>
+            <value>500</value>
+        </property>
 
         <!-- datasource config -->
         <property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 2839915..62f66f6 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -28,6 +28,7 @@
     password : "secret",
     basePath : "/rest",
     readTimeOutSeconds : 2
+    flushLimit: 500
   },
 
   "zookeeper" : {

http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
index ba5914b..87ff27a 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java
@@ -30,7 +30,6 @@ import org.apache.eagle.topology.storm.TopologyDataPersistBolt;
 
 public class TopologyCheckApp extends StormApplication {
 
-    private static final String SINK_TASK_NUM = "topology.numOfSinkTasks";
     private static final String TOPOLOGY_HEALTH_CHECK_STREAM = "topology_health_check_stream";
 
     @Override
@@ -41,7 +40,6 @@ public class TopologyCheckApp extends StormApplication {
         String persistBoltName = TopologyCheckAppConfig.TOPOLOGY_ENTITY_PERSIST_BOLT_NAME;
         String parseBoltName = TopologyCheckAppConfig.PARSE_BOLT_NAME;
         String kafkaSinkBoltName = TopologyCheckAppConfig.SINK_BOLT_NAME;
-        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
 
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout(
@@ -59,10 +57,15 @@ public class TopologyCheckApp extends StormApplication {
         topologyBuilder.setBolt(
             parseBoltName,
             new HealthCheckParseBolt(),
-            topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(persistBoltName);
+            topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt
+        ).setNumTasks(topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(persistBoltName);
 
         StormStreamSink<?> sinkBolt = environment.getStreamSink(TOPOLOGY_HEALTH_CHECK_STREAM, config);
-        topologyBuilder.setBolt(kafkaSinkBoltName, sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks).shuffleGrouping(parseBoltName);
+        topologyBuilder.setBolt(
+                kafkaSinkBoltName,
+                sinkBolt,
+                topologyCheckAppConfig.dataExtractorConfig.numKafkaSinkBolt
+        ).setNumTasks(topologyCheckAppConfig.dataExtractorConfig.numKafkaSinkBolt).shuffleGrouping(parseBoltName);
 
         return topologyBuilder.createTopology();
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index f6d61f6..90a3773 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -69,14 +69,15 @@ public class TopologyCheckAppConfig implements Serializable {
         this.config = config;
 
         this.dataExtractorConfig.site = config.getString("siteId");
-        this.dataExtractorConfig.fetchDataIntervalInSecs = config.getLong("dataExtractorConfig.fetchDataIntervalInSecs");
+        this.dataExtractorConfig.fetchDataIntervalInSecs = config.getLong("topology.fetchDataIntervalInSecs");
         this.dataExtractorConfig.parseThreadPoolSize = MAX_NUM_THREADS;
-        if (config.hasPath("dataExtractorConfig.parseThreadPoolSize")) {
-            this.dataExtractorConfig.parseThreadPoolSize = config.getInt("dataExtractorConfig.parseThreadPoolSize");
+        if (config.hasPath("topology.parseThreadPoolSize")) {
+            this.dataExtractorConfig.parseThreadPoolSize = config.getInt("topology.parseThreadPoolSize");
         }
-        this.dataExtractorConfig.numDataFetcherSpout = config.getInt("dataExtractorConfig.numDataFetcherSpout");
-        this.dataExtractorConfig.numEntityPersistBolt = config.getInt("dataExtractorConfig.numEntityPersistBolt");
-        String resolveCls = config.getString("dataExtractorConfig.rackResolverCls");
+        this.dataExtractorConfig.numDataFetcherSpout = config.getInt("topology.numDataFetcherSpout");
+        this.dataExtractorConfig.numEntityPersistBolt = config.getInt("topology.numEntityPersistBolt");
+        this.dataExtractorConfig.numKafkaSinkBolt = config.getInt("topology.numOfKafkaSinkBolt");
+        String resolveCls = config.getString("topology.rackResolverCls");
         try {
             this.dataExtractorConfig.resolverCls = (Class<? extends TopologyRackResolver>) Class.forName(resolveCls);
         } catch (ClassNotFoundException e) {
@@ -85,7 +86,7 @@ public class TopologyCheckAppConfig implements Serializable {
             //e.printStackTrace();
         }
 
-        if (config.hasPath("dataSourceConfig.hbase") && config.getBoolean("dataSourceConfig.hbase.enabled")) {
+        if (config.hasPath("dataSourceConfig.hbase.enabled") && config.getBoolean("dataSourceConfig.hbase.enabled")) {
             topologyTypes.add(TopologyConstants.TopologyType.HBASE);
             hBaseConfig = new HBaseConfig();
 
@@ -98,14 +99,14 @@ public class TopologyCheckAppConfig implements Serializable {
             hBaseConfig.hbaseMasterPrincipal = getOptionalConfig("dataSourceConfig.hbase.kerberos.master.principal", null);
         }
 
-        if (config.hasPath("dataSourceConfig.mr") && config.getBoolean("dataSourceConfig.mr.enabled")) {
+        if (config.hasPath("dataSourceConfig.mr.enabled") && config.getBoolean("dataSourceConfig.mr.enabled")) {
             topologyTypes.add(TopologyConstants.TopologyType.MR);
             mrConfig = new MRConfig();
             mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*");
             mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null);
         }
 
-        if (config.hasPath("dataSourceConfig.hdfs") && config.getBoolean("dataSourceConfig.hdfs.enabled")) {
+        if (config.hasPath("dataSourceConfig.hdfs.enabled") && config.getBoolean("dataSourceConfig.hdfs.enabled")) {
             topologyTypes.add(TopologyConstants.TopologyType.HDFS);
             hdfsConfig = new HdfsConfig();
             hdfsConfig.namenodeUrls = config.getString("dataSourceConfig.hdfs.namenodeUrl").split(",\\s*");
@@ -116,6 +117,7 @@ public class TopologyCheckAppConfig implements Serializable {
         public String site;
         public int numDataFetcherSpout;
         public int numEntityPersistBolt;
+        public int numKafkaSinkBolt;
         public long fetchDataIntervalInSecs;
         public int parseThreadPoolSize;
         public Class<? extends TopologyRackResolver> resolverCls;

http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index 2089a2f..b4e3695 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -24,38 +24,51 @@
     <configuration>
         <!-- org.apache.eagle.topology.TopologyCheckApp -->
         <property>
-            <name>dataExtractorConfig.fetchDataIntervalInSecs</name>
+            <name>topology.fetchDataIntervalInSecs</name>
             <displayName>Fetch Data Interval in Secs</displayName>
             <description>fetch data interval in secs</description>
             <value>300</value>
         </property>
         <property>
-            <name>dataExtractorConfig.parseThreadPoolSize</name>
+            <name>topology.parseThreadPoolSize</name>
             <displayName>Parser Thread Pool Size</displayName>
             <description>parser thread pool size</description>
             <value>5</value>
         </property>
         <property>
-            <name>dataExtractorConfig.numDataFetcherSpout</name>
+            <name>topology.numDataFetcherSpout</name>
             <displayName>Spout Task Number</displayName>
             <description>spout task number</description>
             <value>1</value>
         </property>
         <property>
-            <name>dataExtractorConfig.numEntityPersistBolt</name>
-            <displayName>Bolt Task Number</displayName>
-            <description>bolt task number</description>
+            <name>topology.numEntityPersistBolt</name>
+            <displayName>Storage Bolt Task Number</displayName>
+            <description>number of persist tasks to the storage</description>
             <value>1</value>
         </property>
+        <property>
+            <name>topology.numOfKafkaSinkBolt</name>
+            <displayName>Kafka Sink Task Number</displayName>
+            <value>2</value>
+            <description>number of sinks to alert engine</description>
+        </property>
 
         <property>
-            <name>dataExtractorConfig.rackResolverCls</name>
+            <name>topology.rackResolverCls</name>
             <displayName>Rack Resolver Class</displayName>
             <description>rack resolver class</description>
             <value>org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver</value>
         </property>
 
         <property>
+            <name>dataSourceConfig.hbase.enabled</name>
+            <displayName>HBase Topology Check Enabled</displayName>
+            <description>HBase topology status check enabled</description>
+            <value>false</value>
+            <required>true</required>
+        </property>
+        <property>
             <name>dataSourceConfig.hbase.zkQuorum</name>
             <displayName>HBase Zookeeper Quorum</displayName>
             <description>hbase zookeeper quorum (optional)</description>
@@ -71,13 +84,6 @@
             <description>hbase zookeeper client port (optional)</description>
         </property>
 
-
-        <property>
-            <name>dataSourceConfig.hdfs.namenodeUrl</name>
-            <displayName>Hdfs Namenode Web URL</displayName>
-            <description>hdfs namenode web url for HDFS monitor</description>
-            <value>http://sandbox.hortonworks.com:50070</value>
-        </property>
         <property>
             <name>dataSourceConfig.hdfs.enabled</name>
             <displayName>HDFS Topology Check Enabled</displayName>
@@ -86,6 +92,13 @@
             <required>true</required>
         </property>
         <property>
+            <name>dataSourceConfig.hdfs.namenodeUrl</name>
+            <displayName>Hdfs Namenode Web URL</displayName>
+            <description>hdfs namenode web url for HDFS monitor</description>
+            <value>http://sandbox.hortonworks.com:50070</value>
+        </property>
+
+        <property>
             <name>dataSourceConfig.mr.enabled</name>
             <displayName>MR Topology Check Enabled</displayName>
             <description>MR topology status check enabled</description>
@@ -104,20 +117,7 @@
             <description>URL for history server monitor (optional)</description>
             <value></value>
         </property>
-        <property>
-            <name>dataSourceConfig.hbase.enabled</name>
-            <displayName>HBase Topology Check Enabled</displayName>
-            <description>HBase topology status check enabled</description>
-            <value>false</value>
-            <required>true</required>
-        </property>
 
-        <property>
-            <name>topology.numOfSinkTasks</name>
-            <displayName>topology.numOfSinkTasks</displayName>
-            <value>2</value>
-            <description>number of sink tasks</description>
-        </property>
 
         <!-- data sink configurations -->
         <property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
index 5d5f1d3..180e57e 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
@@ -20,15 +20,11 @@
   workers : 1,
 
   topology : {
-    "numOfSinkTasks" : 2
-  }
-
-  dataExtractorConfig : {  
-    "site": "sandbox",
     "fetchDataIntervalInSecs": 300,
     "parseThreadPoolSize": 5,
     "numDataFetcherSpout" : 1,
     "numEntityPersistBolt" : 1,
+    "numOfKafkaSinkBolt": 2,
     "rackResolverCls" : "org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver"
   }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
index da52f65..c6f17ae 100644
--- a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
@@ -18,13 +18,13 @@
   mode : "LOCAL",
   workers : 1,
 
-  dataExtractorConfig : {
-    "site": "sandbox",
-    "fetchDataIntervalInSecs": 15,
+  topology : {
+    "fetchDataIntervalInSecs": 300,
     "parseThreadPoolSize": 5,
-    "checkRetryTime" : 3,
     "numDataFetcherSpout" : 1,
-    "numEntityPersistBolt" : 1
+    "numEntityPersistBolt" : 1,
+    "numOfKafkaSinkBolt": 2,
+    "rackResolverCls" : "org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver"
   }
 
   dataSourceConfig : {