You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/11/16 09:40:23 UTC

incubator-eagle git commit: [EAGLE-780] Update spark running config to integrate with the application framework

Repository: incubator-eagle
Updated Branches:
  refs/heads/master fbac5c3cc -> 8b648011d


[EAGLE-780] Update spark running config to integrate with the application framework

https://issues.apache.org/jira/browse/EAGLE-780

Author: Zhao, Qingwen <qi...@apache.org>
Author: Qingwen Zhao <qi...@gmail.com>

Closes #659 from qingwen220/EAGLE-780.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8b648011
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8b648011
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8b648011

Branch: refs/heads/master
Commit: 8b648011df4b9f10394ac853978256f0e62e014d
Parents: fbac5c3
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Wed Nov 16 17:40:16 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Wed Nov 16 17:40:16 2016 +0800

----------------------------------------------------------------------
 ...e.alert.app.AlertUnitTopologyAppProvider.xml |   4 +-
 .../publisher/AlertKafkaPublisherTest.java      |   9 +-
 .../spark/history/SparkHistoryJobAppConfig.java |   4 +-
 ...spark.history.SparkHistoryJobAppProvider.xml |  27 +--
 .../src/main/resources/application.conf         |   7 +-
 .../jpm/spark/running/SparkRunningJobApp.java   |  10 +-
 .../spark/running/SparkRunningJobAppConfig.java |  94 ++++------
 ...spark.running.SparkRunningJobAppProvider.xml | 183 +++++--------------
 .../src/main/resources/application.conf         |  48 +++--
 ....security.hbase.HBaseAuditLogAppProvider.xml |  39 ++--
 ...ecurity.auditlog.HdfsAuditLogAppProvider.xml |   8 +-
 .../eagle/topology/TopologyCheckAppConfig.java  |   2 +-
 ....eagle.topology.TopologyCheckAppProvider.xml |   6 -
 .../src/main/resources/application.conf         |   1 +
 14 files changed, 149 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
index 3c8d58e..8ee8b6b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
@@ -68,8 +68,8 @@
         <property>
             <name>topology.message.timeout.secs</name>
             <displayName>topology message timeout (secs)</displayName>
-            <description>default timeout is 30s</description>
-            <value>30</value>
+            <description>default timeout is 300s</description>
+            <value>300</value>
             <required>false</required>
         </property>
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
index aaa1e80..ddf2001 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
@@ -28,10 +28,7 @@ import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
 import org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher;
 import org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer;
 import org.apache.eagle.alert.utils.KafkaEmbedded;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
 
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
@@ -69,7 +66,7 @@ public class AlertKafkaPublisherTest {
         }
     }
 
-    @Test
+    @Test @Ignore
     public void testAsync() throws Exception {
         AlertKafkaPublisher publisher = new AlertKafkaPublisher();
         Map<String, Object> properties = new HashMap<>();
@@ -104,7 +101,7 @@ public class AlertKafkaPublisherTest {
         publisher.close();
     }
 
-    @Test
+    @Test @Ignore
     public void testSync() throws Exception {
         AlertKafkaPublisher publisher = new AlertKafkaPublisher();
         Map<String, Object> properties = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 86f13ff..adde60b 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -31,7 +31,7 @@ public class SparkHistoryJobAppConfig implements Serializable {
     static final String SPARK_HISTORY_JOB_FETCH_SPOUT_NAME = "sparkHistoryJobFetchSpout";
     static final String SPARK_HISTORY_JOB_PARSE_BOLT_NAME = "sparkHistoryJobParseBolt";
 
-    static final String DEFAULT_SPARK_JOB_HISTORY_ZOOKEEPER_ROOT = "/eagle/sparkJobHistory";
+    static final String DEFAULT_SPARK_JOB_HISTORY_ZOOKEEPER_ROOT = "/apps/spark/history";
 
     public ZKStateConfig zkStateConfig;
     public JobHistoryEndpointConfig jobHistoryConfig;
@@ -70,7 +70,7 @@ public class SparkHistoryJobAppConfig implements Serializable {
         }
 
         jobHistoryConfig.rms = config.getString("dataSourceConfig.rm.url").split(",\\s*");
-        jobHistoryConfig.baseDir = config.getString("dataSourceConfig.baseDir");
+        jobHistoryConfig.baseDir = config.getString("dataSourceConfig.hdfs.baseDir");
         for (Map.Entry<String, ConfigValue> entry : config.getConfig("dataSourceConfig.hdfs").entrySet()) {
             this.jobHistoryConfig.hdfs.put(entry.getKey(), entry.getValue().unwrapped().toString());
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index 8159edc..c68d4e8 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -65,31 +65,6 @@
             <description>default timeout is 30s</description>
             <value>300</value>
         </property>
-        <!-- zookeeper config -->
-        <property>
-            <name>zkStateConfig.zkQuorum</name>
-            <displayName>zookeeper quorum list</displayName>
-            <description>zookeeper to store topology metadata</description>
-            <value>sandbox.hortonworks.com:2181</value>
-        </property>
-        <property>
-            <name>zkStateConfig.zkSessionTimeoutMs</name>
-            <displayName>zookeeper session timeout (ms)</displayName>
-            <description>Zookeeper session timeoutMs</description>
-            <value>15000</value>
-        </property>
-        <property>
-            <name>zkStateConfig.zkRetryTimes</name>
-            <displayName>zookeeper connection retry times</displayName>
-            <description>retry times for zookeeper connection</description>
-            <value>3</value>
-        </property>
-        <property>
-            <name>zkStateConfig.zkRetryInterval</name>
-            <displayName>zookeeper connection retry interval</displayName>
-            <description>retry interval for zookeeper connection</description>
-            <value>20000</value>
-        </property>
 
         <!-- datasource config -->
         <property>
@@ -107,7 +82,7 @@
             <required>true</required>
         </property>
         <property>
-            <name>dataSourceConfig.baseDir</name>
+            <name>dataSourceConfig.hdfs.baseDir</name>
             <displayName>hdfs base path for spark job data</displayName>
             <description>hdfs base path for spark job data</description>
             <value>/spark-history</value>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 5f3fdac..a51abc9 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -29,9 +29,10 @@
     basePath : "/rest",
     readTimeOutSeconds : 2
   },
-  "zkStateConfig" : {
+
+  "zookeeper" : {
     "zkQuorum" : "sandbox.hortonworks.com:2181",
-    "zkRoot" : "/sparkJobHistory",
+    "zkRoot" : "/apps/spark/running",
     "zkSessionTimeoutMs" : 15000,
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000,
@@ -39,9 +40,9 @@
 
   "dataSourceConfig":{
     rm.url: "http://sandbox.hortonworks.com:8088",
-    "baseDir" : "/spark-history",
     hdfs: {
       fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020",
+      baseDir : "/spark-history",
       #if not need, then do not set
       # hdfs.kerberos.principal = ,
       # hdfs.keytab.file =

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
index 2ee2a04..209481a 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
@@ -30,14 +30,14 @@ public class SparkRunningJobApp extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
         //1. trigger init conf
-        SparkRunningJobAppConfig sparkRunningJobAppConfig = SparkRunningJobAppConfig.getInstance(config);
+        SparkRunningJobAppConfig sparkRunningJobAppConfig = SparkRunningJobAppConfig.newInstance(config);
 
         //2. init topology
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         final String spoutName = SparkRunningJobAppConfig.JOB_FETCH_SPOUT_NAME;
         final String boltName = SparkRunningJobAppConfig.JOB_PARSE_BOLT_NAME;
-        int parallelism = sparkRunningJobAppConfig.getTopologyConfig().jobFetchSpoutParallism;
-        int tasks = sparkRunningJobAppConfig.getTopologyConfig().jobFetchSpoutTasksNum;
+        int parallelism = sparkRunningJobAppConfig.getJobExtractorConfig().jobFetchSpoutParallism;
+        int tasks = sparkRunningJobAppConfig.getJobExtractorConfig().jobFetchSpoutTasksNum;
         if (parallelism > tasks) {
             parallelism = tasks;
         }
@@ -50,8 +50,8 @@ public class SparkRunningJobApp extends StormApplication {
                 parallelism
         ).setNumTasks(tasks);
 
-        parallelism = sparkRunningJobAppConfig.getTopologyConfig().jobParseBoltParallism;
-        tasks = sparkRunningJobAppConfig.getTopologyConfig().jobParseBoltTasksNum;
+        parallelism = sparkRunningJobAppConfig.getJobExtractorConfig().jobParseBoltParallism;
+        tasks = sparkRunningJobAppConfig.getJobExtractorConfig().jobParseBoltTasksNum;
         if (parallelism > tasks) {
             parallelism = tasks;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index 6855b8e..3ae4a35 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -19,7 +19,6 @@
 package org.apache.eagle.jpm.spark.running;
 
 import com.typesafe.config.ConfigValue;
-import org.apache.eagle.common.config.ConfigOptionParser;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,16 +32,13 @@ public class SparkRunningJobAppConfig implements Serializable {
     static final String JOB_FETCH_SPOUT_NAME = "sparkRunningJobFetchSpout";
     static final String JOB_PARSE_BOLT_NAME = "sparkRunningJobParseBolt";
 
+    static final String DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT = "/apps/spark/running";
+
     ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
 
     private ZKStateConfig zkStateConfig;
-    private TopologyConfig topologyConfig;
-
-    public TopologyConfig getTopologyConfig() {
-        return topologyConfig;
-    }
 
     public EagleServiceConfig getEagleServiceConfig() {
         return eagleServiceConfig;
@@ -62,20 +58,12 @@ public class SparkRunningJobAppConfig implements Serializable {
 
     private EndpointConfig endpointConfig;
 
-    public static class TopologyConfig implements Serializable {
-        public int jobFetchSpoutParallism;
-        public int jobFetchSpoutTasksNum;
-        public int jobParseBoltParallism;
-        public int jobParseBoltTasksNum;
-    }
-
     public static class ZKStateConfig implements Serializable {
         public String zkQuorum;
         public String zkRoot;
         public int zkSessionTimeoutMs;
         public int zkRetryTimes;
         public int zkRetryInterval;
-        public String zkPort;
         public boolean recoverEnabled;
     }
 
@@ -92,6 +80,10 @@ public class SparkRunningJobAppConfig implements Serializable {
         public String site;
         public int fetchRunningJobInterval;
         public int parseThreadPoolSize;
+        public int jobFetchSpoutParallism;
+        public int jobFetchSpoutTasksNum;
+        public int jobParseBoltParallism;
+        public int jobParseBoltTasksNum;
     }
 
     public static class EndpointConfig implements Serializable {
@@ -106,70 +98,62 @@ public class SparkRunningJobAppConfig implements Serializable {
 
     private Config config;
 
-    private static SparkRunningJobAppConfig manager = new SparkRunningJobAppConfig();
-
-    private SparkRunningJobAppConfig() {
+    private SparkRunningJobAppConfig(Config config) {
         this.eagleServiceConfig = new EagleServiceConfig();
         this.jobExtractorConfig = new JobExtractorConfig();
         this.endpointConfig = new EndpointConfig();
         this.endpointConfig.hdfs = new HashMap<>();
         this.zkStateConfig = new ZKStateConfig();
-        this.topologyConfig = new TopologyConfig();
-    }
-
-    public static SparkRunningJobAppConfig getInstance(String[] args) {
-        try {
-            LOG.info("Loading from configuration file");
-            manager.init(new ConfigOptionParser().load(args));
-        } catch (Exception e) {
-            LOG.error("failed to load config");
-        }
-        return manager;
+        init(config);
     }
 
-    public static SparkRunningJobAppConfig getInstance(Config config) {
-        manager.init(config);
-        return manager;
+    public static SparkRunningJobAppConfig newInstance(Config config) {
+        return new SparkRunningJobAppConfig(config);
     }
 
     private void init(Config config) {
         this.config = config;
-        this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
-        this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
-        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
-        this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
-        this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
-        this.zkStateConfig.recoverEnabled = config.getBoolean("zookeeperConfig.recoverEnabled");
 
+        this.zkStateConfig.zkQuorum = config.getString("zookeeper.zkQuorum");
+        this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
+        this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT;
+        if (config.hasPath("zookeeper.zkRoot")) {
+            this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
+        }
+        this.zkStateConfig.recoverEnabled = false;
+        if (config.hasPath("jobExtractorConfig.recoverEnabled")) {
+            this.zkStateConfig.recoverEnabled = config.getBoolean("jobExtractorConfig.recoverEnabled");
+        }
 
         // parse eagle service endpoint
-        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
-        String port = config.getString("eagleProps.eagleService.port");
-        this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
-        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
-        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
-        this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
-        this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
+        this.eagleServiceConfig.eagleServiceHost = config.getString("service.host");
+        this.eagleServiceConfig.eagleServicePort = config.getInt("service.port");
+        this.eagleServiceConfig.username = config.getString("service.username");
+        this.eagleServiceConfig.password = config.getString("service.password");
+        this.eagleServiceConfig.readTimeoutSeconds = config.getInt("service.readTimeOutSeconds");
+        this.eagleServiceConfig.maxFlushNum = 500;
+        if (config.hasPath("service.maxFlushNum")) {
+            this.eagleServiceConfig.maxFlushNum = config.getInt("service.maxFlushNum");
+        }
 
         //parse job extractor
-        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+        this.jobExtractorConfig.site = config.getString("siteId");
         this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
         this.jobExtractorConfig.parseThreadPoolSize = config.getInt("jobExtractorConfig.parseThreadPoolSize");
+        this.jobExtractorConfig.jobFetchSpoutParallism = config.getInt("jobExtractorConfig.numOfSpoutExecutors");
+        this.jobExtractorConfig.jobFetchSpoutTasksNum = config.getInt("jobExtractorConfig.numOfSpoutTasks");
+        this.jobExtractorConfig.jobParseBoltParallism = config.getInt("jobExtractorConfig.numOfParseBoltExecutors");
+        this.jobExtractorConfig.jobParseBoltTasksNum = config.getInt("jobExtractorConfig.numOfParserBoltTasks");
 
         //parse endpointConfig config
-        this.endpointConfig.eventLog = config.getString("endpointConfig.eventLog");
-        for (Map.Entry<String, ConfigValue> entry : config.getConfig("endpointConfig.hdfs").entrySet()) {
+        this.endpointConfig.rmUrls = config.getString("dataSourceConfig.rmUrls").split(",");
+        this.endpointConfig.eventLog = config.getString("dataSourceConfig.hdfs.baseDir");
+        for (Map.Entry<String, ConfigValue> entry : config.getConfig("dataSourceConfig.hdfs").entrySet()) {
             this.endpointConfig.hdfs.put(entry.getKey(), entry.getValue().unwrapped().toString());
         }
 
-        this.endpointConfig.rmUrls = config.getString("endpointConfig.rmUrls").split(",");
-
-        this.topologyConfig.jobFetchSpoutParallism = config.getInt("envContextConfig.parallelismConfig." + JOB_FETCH_SPOUT_NAME);
-        this.topologyConfig.jobFetchSpoutTasksNum = config.getInt("envContextConfig.tasks." + JOB_FETCH_SPOUT_NAME);
-        this.topologyConfig.jobParseBoltParallism = config.getInt("envContextConfig.parallelismConfig." + JOB_PARSE_BOLT_NAME);
-        this.topologyConfig.jobParseBoltTasksNum = config.getInt("envContextConfig.tasks." + JOB_PARSE_BOLT_NAME);
-
         LOG.info("Successfully initialized SparkRunningJobAppConfig");
         LOG.info("site: " + this.jobExtractorConfig.site);
         LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
index 0726972..0503d74 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml
@@ -23,172 +23,89 @@
     <appClass>org.apache.eagle.jpm.spark.running.SparkRunningJobApp</appClass>
     <configuration>
         <!-- org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig -->
+
         <property>
-            <name>envContextConfig.env</name>
-            <value>local</value>
-            <displayName>Environment</displayName>
-            <description>Execution environment</description>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkQuorum</name>
-            <displayName>zkQuorum</displayName>
-            <description>Zookeeper Quorum</description>
-            <value>sandbox.hortonworks.com:2181</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkPort</name>
-            <displayName>zkPort</displayName>
-            <description>Zookeeper Port</description>
-            <value>2181</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkSessionTimeoutMs</name>
-            <displayName>zkSessionTimeoutMs</displayName>
-            <description>Zookeeper session timeoutMs</description>
-            <value>15000</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkRetryTimes</name>
-            <displayName>zkRetryTimes</displayName>
-            <description>zookeeperConfig.zkRetryTimes</description>
-            <value>3</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkRetryInterval</name>
-            <displayName>zkRetryInterval</displayName>
-            <description>zookeeperConfig.zkRetryInterval</description>
-            <value>20000</value>
+            <name>dataSourceConfig.rmUrls</name>
+            <displayName>resource manager url</displayName>
+            <description>url to fetch finished spark job list</description>
+            <value>http://sandbox.hortonworks.com:8088</value>
+            <required>true</required>
         </property>
         <property>
-            <name>zookeeperConfig.zkRoot</name>
-            <value>/apps/spark/running</value>
+            <name>dataSourceConfig.hdfs.fs.defaultFS</name>
+            <displayName>hdfs url</displayName>
+            <description>target hdfs to crawl log data</description>
+            <value>hdfs://sandbox.hortonworks.com:8020</value>
+            <required>true</required>
         </property>
         <property>
-            <name>zookeeperConfig.recoverEnabled</name>
-            <description>zookeeperConfig.recoverEnabled</description>
-            <value>false</value>
+            <name>dataSourceConfig.hdfs.baseDir</name>
+            <displayName>hdfs base path for spark job data</displayName>
+            <description>hdfs base path for spark job data</description>
+            <value>/spark-history</value>
+            <required>true</required>
         </property>
+
         <property>
-            <name>eagleProps.eagleService.host</name>
-            <description>eagleProps.eagleService.host</description>
-            <value>sandbox.hortonworks.com</value>
+            <name>workers</name>
+            <displayName>topology workers</displayName>
+            <description>topology workers</description>
+            <value>1</value>
         </property>
         <property>
-            <name>eagleProps.eagleService.port</name>
-            <description>eagleProps.eagleService.port</description>
-            <value>9099</value>
+            <name>jobExtractorConfig.numOfSpoutExecutors</name>
+            <displayName>spout executors</displayName>
+            <description>Parallelism of sparkRunningJobFetchSpout </description>
+            <value>1</value>
         </property>
         <property>
-            <name>eagleProps.eagleService.username</name>
-            <description>eagleProps.eagleService.username</description>
-            <value>admin</value>
+            <name>jobExtractorConfig.numOfSpoutTasks</name>
+            <displayName>spout tasks</displayName>
+            <description>Tasks Num of sparkRunningJobFetchSpout </description>
+            <value>4</value>
         </property>
         <property>
-            <name>eagleProps.eagleService.password</name>
-            <description>eagleProps.eagleService.password</description>
-            <value>secret</value>
+            <name>jobExtractorConfig.numOfParseBoltExecutors</name>
+            <displayName>parser bolt parallelism hint</displayName>
+            <description>Parallelism of sparkRunningJobParseBolt </description>
+            <value>1</value>
         </property>
         <property>
-            <name>eagleProps.eagleService.readTimeOutSeconds</name>
-            <description>eagleProps.eagleService.readTimeOutSeconds</description>
-            <value>20</value>
+            <name>jobExtractorConfig.numOfParserBoltTasks</name>
+            <displayName>parser bolt tasks</displayName>
+            <description>Tasks Num of sparkRunningJobParseBolt</description>
+            <value>4</value>
         </property>
         <property>
-            <name>eagleProps.eagleService.maxFlushNum</name>
-            <description>eagleProps.eagleService.maxFlushNum</description>
-            <value>500</value>
+            <name>topology.message.timeout.secs</name>
+            <displayName>topology message timeout (secs)</displayName>
+            <description>default timeout is 30s</description>
+            <value>30</value>
         </property>
+
         <property>
-            <name>jobExtractorConfig.site</name>
-            <description>jobExtractorConfig.site</description>
-            <value>sandbox</value>
+            <name>jobExtractorConfig.recoverEnabled</name>
+            <displayName>recover enabled</displayName>
+            <description>if recover is needed when restart</description>
+            <value>false</value>
         </property>
         <property>
             <name>jobExtractorConfig.fetchRunningJobInterval</name>
-            <description>jobExtractorConfig.fetchRunningJobInterval</description>
+            <displayName>spout fetch data interval</displayName>
+            <description>spout fetch data interval (in milliseconds)</description>
             <value>15</value>
         </property>
         <property>
             <name>jobExtractorConfig.parseThreadPoolSize</name>
-            <description>jobExtractorConfig.parseThreadPoolSize</description>
+            <displayName>thread pool size for data parsing</displayName>
+            <description>thread pool size for data parsing</description>
             <value>5</value>
         </property>
-        <property>
-            <name>dataSourceConfig.eventLog</name>
-            <description>dataSourceConfig.eventLog</description>
-            <value>/spark-history</value>
-        </property>
-        <property>
-            <name>dataSourceConfig.nnEndpoint</name>
-            <description>dataSourceConfig.nnEndpoint</description>
-            <value>hdfs://sandbox.hortonworks.com:8020</value>
-        </property>
-        <property>
-            <name>dataSourceConfig.keytab</name>
-            <description>dataSourceConfig.keytab</description>
-            <value></value>
-        </property>
-        <property>
-            <name>dataSourceConfig.principal</name>
-            <description>dataSourceConfig.principal</description>
-            <value></value>
-        </property>
-        <property>
-            <name>dataSourceConfig.rmUrls</name>
-            <description>dataSourceConfig.rmUrls</description>
-            <value>http://sandbox.hortonworks.com:8088</value>
-        </property>
-        <property>
-            <name>envContextConfig.parallelismConfig.sparkRunningJobFetchSpout</name>
-            <description>Parallelism of sparkRunningJobFetchSpout </description>
-            <value>1</value>
-        </property>
-        <property>
-            <name>envContextConfig.tasks.sparkRunningJobFetchSpout</name>
-            <description>Tasks Num of sparkRunningJobFetchSpout </description>
-            <value>4</value>
-        </property>
-        <property>
-            <name>envContextConfig.parallelismConfig.sparkRunningJobParseBolt</name>
-            <description>Parallelism of sparkRunningJobParseBolt </description>
-            <value>1</value>
-        </property>
-        <property>
-            <name>envContextConfig.tasks.sparkRunningJobParseBolt</name>
-            <description>Tasks Num of sparkRunningJobParseBolt</description>
-            <value>4</value>
-        </property>
     </configuration>
     <docs>
         <install>
-            # Step 1: Create source kafka topic named "${site}_example_source_topic"
-
-            ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
-
-            # Step 2: Set up data collector to flow data into kafka topic in
-
-            ./bin/logstash -f log_collector.conf
-
-            ## `log_collector.conf` sample as following:
-
-            input {
-
-            }
-            filter {
-
-            }
-            output{
-
-            }
-
-            # Step 3: start application
-
-            # Step 4: monitor with featured portal or alert with policies
         </install>
         <uninstall>
-            # Step 1: stop and uninstall application
-            # Step 2: delete kafka topic named "${site}_example_source_topic"
-            # Step 3: stop logstash
         </uninstall>
     </docs>
 </application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
index f0f6d42..ef5bf93 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
@@ -14,31 +14,27 @@
 # limitations under the License.
 
 {
+  "siteId": "sandbox"
   "appId":"sparkRunningJob",
   "mode":"LOCAL",
   "workers" : 3,
-  "envContextConfig" : {
-    "stormConfigFile" : "storm.yaml",
-    "parallelismConfig" : {
-      "sparkRunningJobFetchSpout" : 1,
-      "sparkRunningJobParseBolt" : 4
-    },
-    "tasks" : {
-      "sparkRunningJobFetchSpout" : 1,
-      "sparkRunningJobParseBolt" : 4
-    },
-  },
+  topology.message.timeout.secs: 300,
+
   "jobExtractorConfig" : {
-    "site" : "sandbox",
-    "fetchRunningJobInterval" : 15,
-    "parseThreadPoolSize" : 5
+    numOfSpoutExecutors: 1,
+    numOfSpoutTasks: 4,
+    numOfParseBoltExecutors: 1,
+    numOfParserBoltTasks: 4,
+    fetchRunningJobInterval : 15,
+    parseThreadPoolSize : 5,
+    recoverEnabled: false,
   },
 
-  "endpointConfig" : {
+  "dataSourceConfig" : {
     "rmUrls": "http://sandbox.hortonworks.com:8088",
-    "eventLog" : "/spark-history",
     "hdfs" : {
       fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020",
+      baseDir: "/spark-history",
       #if not need, then do not set
       # hdfs.kerberos.principal = ,
       # hdfs.keytab.file =
@@ -46,7 +42,7 @@
     }
   },
 
-  "zookeeperConfig" : {
+  "zookeeper" : {
     "zkQuorum" : "sandbox.hortonworks.com:2181",
     "zkPort" : "2181",
     "zkRoot" : "/apps/spark/running",
@@ -55,14 +51,12 @@
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000
   },
-  "eagleProps" : {
-    "mailHost" : "abc.com",
-    "mailDebug" : "true",
-    eagleService.host:"sandbox.hortonworks.com",
-    eagleService.port: 9099,
-    eagleService.username: "admin",
-    eagleService.password : "secret",
-    eagleService.readTimeOutSeconds : 20,
-    eagleService.maxFlushNum : 500
-  }
+
+  "service":{
+      host:"sandbox.hortonworks.com",
+      port: 9099,
+      username: "admin",
+      password : "secret",
+      readTimeOutSeconds : 20
+  },
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.HBaseAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.HBaseAuditLogAppProvider.xml b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.HBaseAuditLogAppProvider.xml
index 414765d..403518a 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.HBaseAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.HBaseAuditLogAppProvider.xml
@@ -192,34 +192,21 @@
     </streams>
     <docs>
         <install>
-            # Step 1: Create source kafka topic named "${site}_example_source_topic"
-
-            ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
-
-            # Step 2: Set up data collector to flow data into kafka topic in
-
-            ./bin/logstash -f log_collector.conf
-
-            ## `log_collector.conf` sample as following:
-
-            input {
-
-            }
-            filter {
-
-            }
-            output{
-
-            }
-
-            # Step 3: start application
-
-            # Step 4: monitor with featured portal or alert with policies
+            <b>How to Install</b>
+            <ol>
+                <li>Create two kafka topics: <code>hbase_audit_log_{SITE_ID}, hbase_audit_log_enriched_{SITE_ID}</code></li>
+                <li>Setup a log collecting tool you like to stream audit log into topic <code>hbase_audit_log_{SITE_ID}</code></li>
+                <li>Click "Install" button and edit configurations in general and advanced lists according to your requirements </li>
+                <li>Check the new generated stream <code>HBASE_AUDIT_LOG_ENRICHED_STREAM_{SITE_ID}</code> at Alert -> Streams</li>
+            </ol>
         </install>
         <uninstall>
-            # Step 1: stop and uninstall application
-            # Step 2: delete kafka topic named "${site}_example_source_topic"
-            # Step 3: stop logstash
+            <b>How to Uninstall</b>
+            <ol>
+                <li>Click "Stop" button to stop the running application</li>
+                <li>Remove three kafka topics</li>
+                <li>Click "Uninstall" button which will remove stream <code>HBASE_AUDIT_LOG_ENRICHED_STREAM_{SITE_ID}</code></li>
+            </ol>
         </uninstall>
     </docs>
 </application>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 801a183..1108497 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -62,6 +62,12 @@
             <value>2</value>
             <description>number of sink tasks</description>
         </property>
+        <property>
+            <name>topology.message.timeout.secs</name>
+            <displayName>topology message timeout (secs)</displayName>
+            <description>default timeout is 60s</description>
+            <value>60</value>
+        </property>
 
         <!-- data source configurations -->
         <property>
@@ -203,7 +209,7 @@
         <install>
             <b>How to Install</b>
             <ol>
-                <li>Create three kafka topics: <code>hdfs_audit_log_{SITE_ID}, hdfs_audit_log_enriched_{SITE_ID}, hdfs_audit_log_alert_{SITE_ID}</code></li>
+                <li>Create two kafka topics: <code>hdfs_audit_log_{SITE_ID}, hdfs_audit_log_enriched_{SITE_ID}</code></li>
                 <li>Setup a log collecting tool you like to stream audit log into topic <code>hdfs_audit_log_{SITE_ID}</code></li>
                 <li>Click "Install" button and edit configurations in general and advanced lists according to your requirements </li>
                 <li>Check the new generated stream <code>HDFS_AUDIT_LOG_ENRICHED_STREAM_{SITE_ID}</code> at Alert -> Streams</li>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index 0b7cb3d..0234a4d 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -68,7 +68,7 @@ public class TopologyCheckAppConfig implements Serializable {
     private void init(Config config) {
         this.config = config;
 
-        this.dataExtractorConfig.site = config.getString("dataExtractorConfig.site");
+        this.dataExtractorConfig.site = config.getString("siteId");
         this.dataExtractorConfig.fetchDataIntervalInSecs = config.getLong("dataExtractorConfig.fetchDataIntervalInSecs");
         this.dataExtractorConfig.parseThreadPoolSize = MAX_NUM_THREADS;
         if (config.hasPath("dataExtractorConfig.parseThreadPoolSize")) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index 476b19c..cc29ed4 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -24,12 +24,6 @@
     <configuration>
         <!-- org.apache.eagle.topology.TopologyCheckApp -->
         <property>
-            <name>dataExtractorConfig.site</name>
-            <displayName>site</displayName>
-            <description>Site</description>
-            <value>sandbox</value>
-        </property>
-        <property>
             <name>dataExtractorConfig.fetchDataIntervalInSecs</name>
             <displayName>Fetch Data Interval in Secs</displayName>
             <description>Fetch Data Interval in Secs</description>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b648011/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
index f069df5..1795849 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 {
+  siteId : "sandbox",
   appId : "topologyCheckApp",
   mode : "LOCAL",
   workers : 1,