You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/11/01 06:21:13 UTC

incubator-eagle git commit: [EAGLE-704] Update spark history config to integrate with the new application framework

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 9954b4e11 -> 6b0ed3d0d


[EAGLE-704] Update spark history config to integrate with the new application framework

https://issues.apache.org/jira/browse/EAGLE-704

Author: Zhao, Qingwen <qi...@apache.org>

Closes #591 from qingwen220/EAGLE-704.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/6b0ed3d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/6b0ed3d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/6b0ed3d0

Branch: refs/heads/master
Commit: 6b0ed3d0de8fb7302d412f3a190e4793ea0c7977
Parents: 9954b4e
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Tue Nov 1 14:21:06 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Tue Nov 1 14:21:06 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/model/AlertStreamEvent.java    |   7 +-
 .../jpm/spark/history/SparkHistoryJobApp.java   |  11 +-
 .../spark/history/SparkHistoryJobAppConfig.java |  62 +++---
 .../SparkFilesystemInputStreamReaderImpl.java   |   2 +-
 .../status/JobHistoryZKStateManager.java        |   2 +-
 .../history/storm/SparkHistoryJobParseBolt.java |   6 -
 ...spark.history.SparkHistoryJobAppProvider.xml | 205 ++++++-------------
 .../src/main/resources/application.conf         |  45 ++--
 eagle-server/pom.xml                            |   5 +
 ...org.apache.eagle.app.spi.ApplicationProvider |   3 +-
 pom.xml                                         |   2 +-
 11 files changed, 128 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index 442c885..600643b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -108,12 +108,7 @@ public class AlertStreamEvent extends StreamEvent {
                 event.put(column.getName(), null);
                 continue;
             }
-            if (column.getName().equalsIgnoreCase("timestamp") && obj instanceof Long) {
-                String eventTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(((Long) obj).longValue());
-                event.put(column.getName(), eventTime);
-            } else {
-                event.put(column.getName(), obj.toString());
-            }
+            event.put(column.getName(), obj.toString());
         }
         return event;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
index 446eb4e..8a3097d 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
@@ -36,18 +36,17 @@ public class SparkHistoryJobApp extends StormApplication {
 
         // 2. Config topology.
         TopologyBuilder topologyBuilder = new TopologyBuilder();
-        config = sparkHistoryJobAppConfig.getConfig();
+
         topologyBuilder.setSpout(
                 jobFetchSpoutName,
-                new SparkHistoryJobSpout(sparkHistoryJobAppConfig),
-                config.getInt("storm.parallelismConfig." + jobFetchSpoutName)
-        ).setNumTasks(config.getInt("storm.tasks." + jobFetchSpoutName));
+                new SparkHistoryJobSpout(sparkHistoryJobAppConfig), sparkHistoryJobAppConfig.stormConfig.numOfSpoutExecutors
+        ).setNumTasks(sparkHistoryJobAppConfig.stormConfig.numOfSpoutTasks);
 
         topologyBuilder.setBolt(
                 jobParseBoltName,
                 new SparkHistoryJobParseBolt(sparkHistoryJobAppConfig),
-                config.getInt("storm.parallelismConfig." + jobParseBoltName)
-        ).setNumTasks(config.getInt("storm.tasks." + jobParseBoltName)).shuffleGrouping(jobFetchSpoutName);
+                sparkHistoryJobAppConfig.stormConfig.numOfParserBoltExecutors
+        ).setNumTasks(sparkHistoryJobAppConfig.stormConfig.numOfParserBoltTasks).shuffleGrouping(jobFetchSpoutName);
 
         return topologyBuilder.createTopology();
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 393a97c..5049b40 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -31,9 +31,10 @@ public class SparkHistoryJobAppConfig implements Serializable {
     static final String SPARK_HISTORY_JOB_FETCH_SPOUT_NAME = "sparkHistoryJobFetchSpout";
     static final String SPARK_HISTORY_JOB_PARSE_BOLT_NAME = "sparkHistoryJobParseBolt";
 
+    static final String DEFAULT_SPARK_JOB_HISTORY_ZOOKEEPER_ROOT = "/eagle/sparkJobHistory";
+
     public ZKStateConfig zkStateConfig;
     public JobHistoryEndpointConfig jobHistoryConfig;
-    public BasicInfo info;
     public EagleInfo eagleInfo;
     public StormConfig stormConfig;
 
@@ -49,7 +50,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
         this.zkStateConfig = new ZKStateConfig();
         this.jobHistoryConfig = new JobHistoryEndpointConfig();
         this.jobHistoryConfig.hdfs = new HashMap<>();
-        this.info = new BasicInfo();
         this.eagleInfo = new EagleInfo();
         this.stormConfig = new StormConfig();
     }
@@ -62,36 +62,40 @@ public class SparkHistoryJobAppConfig implements Serializable {
     private void init(Config config) {
         this.config = config;
 
-        this.zkStateConfig.zkQuorum = config.getString("zkStateConfig.zkQuorum");
-        this.zkStateConfig.zkRetryInterval = config.getInt("zkStateConfig.zkRetryInterval");
-        this.zkStateConfig.zkRetryTimes = config.getInt("zkStateConfig.zkRetryTimes");
-        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zkStateConfig.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRoot = config.getString("zkStateConfig.zkRoot");
+        this.zkStateConfig.zkQuorum = config.getString("zookeeper.zkQuorum");
+        this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
+        this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_HISTORY_ZOOKEEPER_ROOT;
+        if (config.hasPath("zookeeper.zkRoot")) {
+            this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
+        }
 
-        jobHistoryConfig.historyServerUrl = config.getString("dataSourceConfig.spark.history.server.url");
-        jobHistoryConfig.historyServerUserName = config.getString("dataSourceConfig.spark.history.server.username");
-        jobHistoryConfig.historyServerUserPwd = config.getString("dataSourceConfig.spark.history.server.password");
         jobHistoryConfig.rms = config.getString("dataSourceConfig.rm.url").split(",\\s*");
         jobHistoryConfig.baseDir = config.getString("dataSourceConfig.baseDir");
         for (Map.Entry<String, ConfigValue> entry : config.getConfig("dataSourceConfig.hdfs").entrySet()) {
             this.jobHistoryConfig.hdfs.put(entry.getKey(), entry.getValue().unwrapped().toString());
         }
 
-        info.site = config.getString("basic.cluster") + "-" + config.getString("basic.dataCenter");
-        info.jobConf = config.getString("basic.jobConf.additional.info").split(",\\s*");
-
-        this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
-        this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
-        this.eagleInfo.username = config.getString("eagleProps.eagle.service.username");
-        this.eagleInfo.password = config.getString("eagleProps.eagle.service.password");
-        this.eagleInfo.timeout = config.getInt("eagleProps.eagle.service.read.timeout");
+        this.eagleInfo.host = config.getString("service.host");
+        this.eagleInfo.port = config.getInt("service.port");
+        this.eagleInfo.username = config.getString("service.username");
+        this.eagleInfo.password = config.getString("service.password");
+        this.eagleInfo.timeout = 2;
+        if (config.hasPath("service.readTimeOutSeconds")) {
+            this.eagleInfo.timeout = config.getInt("service.readTimeOutSeconds");
+        }
         this.eagleInfo.basePath = EagleServiceBaseClient.DEFAULT_BASE_PATH;
-        if (config.hasPath("eagleProps.eagle.service.basePath")) {
-            this.eagleInfo.basePath = config.getString("eagleProps.eagle.service.basePath");
+        if (config.hasPath("service.basePath")) {
+            this.eagleInfo.basePath = config.getString("service.basePath");
         }
 
-        this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
-        this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval");
+        this.stormConfig.siteId = config.getString("siteId");
+        this.stormConfig.spoutCrawlInterval = config.getInt("topology.spoutCrawlInterval");
+        this.stormConfig.numOfSpoutExecutors = config.getInt("topology.numOfSpoutExecutors");
+        this.stormConfig.numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks");
+        this.stormConfig.numOfParserBoltExecutors = config.getInt("topology.numOfParseBoltExecutors");
+        this.stormConfig.numOfParserBoltTasks = config.getInt("topology.numOfParserBoltTasks");
     }
 
     public static class ZKStateConfig implements Serializable {
@@ -104,21 +108,17 @@ public class SparkHistoryJobAppConfig implements Serializable {
 
     public static class JobHistoryEndpointConfig implements Serializable {
         public String[] rms;
-        public String historyServerUrl;
-        public String historyServerUserName;
-        public String historyServerUserPwd;
         public String baseDir;
         public Map<String, String> hdfs;
     }
 
-    public static class BasicInfo implements Serializable {
-        public String site;
-        public String[] jobConf;
-    }
-
     public static class StormConfig implements Serializable {
-        public int spoutPending;
+        public String siteId;
         public int spoutCrawlInterval;
+        public int numOfSpoutExecutors;
+        public int numOfSpoutTasks;
+        public int numOfParserBoltExecutors;
+        public int numOfParserBoltTasks;
     }
 
     public static class EagleInfo implements Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
index 57ced63..56998e6 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
@@ -34,7 +34,7 @@ public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReade
 
     public SparkFilesystemInputStreamReaderImpl(SparkHistoryJobAppConfig config, SparkApplicationInfo app) {
         this.config = config;
-        this.site = config.info.site;
+        this.site = config.stormConfig.siteId;
         this.app = app;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 2ce8522..28581d5 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -51,7 +51,7 @@ public class JobHistoryZKStateManager {
     }
 
     public JobHistoryZKStateManager(SparkHistoryJobAppConfig config) {
-        this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.info.site;
+        this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.stormConfig.siteId;
 
         try {
             curator = newCurator(config);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
index 9f8adc7..16f1144 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -24,10 +24,7 @@ import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader;
 import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
 import org.apache.eagle.jpm.spark.history.crawl.SparkFilesystemInputStreamReaderImpl;
 import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
-import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
 import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.SparkHistoryServerResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.model.SparkApplication;
 
 import backtype.storm.task.OutputCollector;
@@ -49,7 +46,6 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobParseBolt.class);
 
     private OutputCollector collector;
-    private ResourceFetcher historyServerFetcher;
     private SparkHistoryJobAppConfig config;
     private JobHistoryZKStateManager zkState;
     private Configuration hdfsConf;
@@ -67,8 +63,6 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
             this.hdfsConf.set(entry.getKey(), entry.getValue());
             LOG.info("conf key {}, conf value {}", entry.getKey(), entry.getValue());
         }
-        this.historyServerFetcher = new SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
-                config.jobHistoryConfig.historyServerUserName, config.jobHistoryConfig.historyServerUserPwd);
         this.zkState = new JobHistoryZKStateManager(config);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index b0d5987..8159edc 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -18,179 +18,102 @@
 
 <application>
     <type>SPARK_HISTORY_JOB_APP</type>
-    <name>Spark History Job Monitoring</name>
+    <name>Spark History Job Monitor</name>
     <version>0.5.0-incubating</version>
     <appClass>org.apache.eagle.jpm.spark.history.SparkHistoryJobApp</appClass>
     <configuration>
-        <!-- org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig -->
+        <!-- topology config -->
         <property>
-            <name>basic.cluster</name>
-            <displayName>cluster</displayName>
-            <description>Cluster Name</description>
-            <value>sandbox</value>
-        </property>
-        <property>
-            <name>basic.dataCenter</name>
-            <displayName>dataCenter</displayName>
-            <description>Data Center</description>
-            <value>sandbox</value>
-        </property>
-        <property>
-            <name>basic.jobConf.additional.info</name>
-            <displayName>jobConf.additional.info</displayName>
-            <description>Additional info in Job Configs</description>
-            <value></value>
-        </property>
-        <property>
-            <name>dataSourceConfig.zkQuorum</name>
-            <displayName>zkQuorum</displayName>
-            <description>Zookeeper Quorum</description>
-            <value>sandbox.hortonworks.com:2181</value>
-        </property>
-        <property>
-            <name>dataSourceConfig.zkRoot</name>
-            <displayName>zkRoot</displayName>
-            <description>Zookeeper Root</description>
-            <value>/sparkHistoryJob</value>
-        </property>
-        <property>
-            <name>dataSourceConfig.zkPort</name>
-            <displayName>zkPort</displayName>
-            <description>Zookeeper Port</description>
-            <value>2181</value>
-        </property>
-        <property>
-            <name>dataSourceConfig.zkSessionTimeoutMs</name>
-            <displayName>zkSessionTimeoutMs</displayName>
-            <description>Zookeeper session timeoutMs</description>
-            <value>15000</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkRetryTimes</name>
-            <displayName>zkRetryTimes</displayName>
-            <description>zookeeperConfig.zkRetryTimes</description>
-            <value>3</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkRetryInterval</name>
-            <displayName>zkRetryInterval</displayName>
-            <description>zookeeperConfig.zkRetryInterval</description>
-            <value>20000</value>
-        </property>
-        <property>
-            <name>dataSourceConfig.spark.history.server.url</name>
-            <displayName>spark.history.server.url</displayName>
-            <description>Spark History Server URL</description>
-            <value>http://sandbox.hortonworks.com:18080</value>
-        </property>
-        <property>
-            <name>dataSourceConfig.spark.history.server.username</name>
-            <displayName>spark.history.server.username</displayName>
-            <description>Spark History Server Auth Username</description>
-            <value></value>
-        </property>
-        <property>
-            <name>dataSourceConfig.spark.history.server.password</name>
-            <displayName>spark.history.server.password</displayName>
-            <description>Spark History Server Auth Password</description>
-            <value></value>
-        </property>
-        <property>
-            <name>eagleProps.eagle.service.host</name>
-            <description>eagleProps.eagle.service.host</description>
-            <value>sandbox.hortonworks.com</value>
+            <name>workers</name>
+            <displayName>topology workers</displayName>
+            <description>topology workers</description>
+            <value>1</value>
         </property>
         <property>
-            <name>eagleProps.eagle.service.port</name>
-            <description>eagleProps.eagle.service.port</description>
-            <value>9099</value>
+            <name>topology.numOfSpoutExecutors</name>
+            <displayName>spout executors</displayName>
+            <description>Parallelism of sparkHistoryJobFetchSpout </description>
+            <value>1</value>
         </property>
         <property>
-            <name>eagleProps.eagle.service.username</name>
-            <description>eagleProps.eagle.service.username</description>
-            <value>admin</value>
+            <name>topology.numOfSpoutTasks</name>
+            <displayName>spout tasks</displayName>
+            <description>Tasks Num of sparkHistoryJobFetchSpout </description>
+            <value>4</value>
         </property>
         <property>
-            <name>eagleProps.eagle.service.password</name>
-            <description>eagleProps.eagle.service.password</description>
-            <value>secret</value>
+            <name>topology.numOfParseBoltExecutors</name>
+            <displayName>parser bolt parallelism hint</displayName>
+            <description>Parallelism of sparkHistoryJobParseBolt </description>
+            <value>1</value>
         </property>
         <property>
-            <name>eagleProps.eagle.service.basePath</name>
-            <description>eagleProps.eagle.service.basePath</description>
-            <value>/rest</value>
+            <name>topology.numOfParserBoltTasks</name>
+            <displayName>parser bolt tasks</displayName>
+            <description>Tasks Num of sparkHistoryJobParseBolt</description>
+            <value>4</value>
         </property>
         <property>
-            <name>eagleProps.eagle.service.read.timeout</name>
-            <displayName>eagleProps.eagle.service.read.timeout</displayName>
-            <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description>
-            <value>2</value>
+            <name>topology.spoutCrawlInterval</name>
+            <displayName>spout crawl interval</displayName>
+            <description>Spout crawl interval (in milliseconds)</description>
+            <value>10000</value>
         </property>
         <property>
-            <name>eagleProps.eagleService.maxFlushNum</name>
-            <displayName>eagleProps.eagleService.maxFlushNum</displayName>
-            <value>500</value>
+            <name>topology.message.timeout.secs</name>
+            <displayName>topology message timeout (secs)</displayName>
+            <description>default timeout is 30s</description>
+            <value>300</value>
         </property>
+        <!-- zookeeper config -->
         <property>
-            <name>dataSourceConfig.hdfs.eventLog</name>
-            <displayName>dataSourceConfig.hdfs.eventLog</displayName>
-            <value>/spark-history</value>
+            <name>zkStateConfig.zkQuorum</name>
+            <displayName>zookeeper quorum list</displayName>
+            <description>zookeeper to store topology metadata</description>
+            <value>sandbox.hortonworks.com:2181</value>
         </property>
         <property>
-            <name>dataSourceConfig.hdfs.endpoint</name>
-            <displayName>dataSourceConfig.hdfs.endpoint</displayName>
-            <value>hdfs://sandbox.hortonworks.com:8020</value>
+            <name>zkStateConfig.zkSessionTimeoutMs</name>
+            <displayName>zookeeper session timeout (ms)</displayName>
+            <description>Zookeeper session timeoutMs</description>
+            <value>15000</value>
         </property>
         <property>
-            <name>dataSourceConfig.hdfs.keytab</name>
-            <displayName>dataSourceConfig.hdfs.keytab</displayName>
-            <value></value>
+            <name>zkStateConfig.zkRetryTimes</name>
+            <displayName>zookeeper connection retry times</displayName>
+            <description>retry times for zookeeper connection</description>
+            <value>3</value>
         </property>
         <property>
-            <name>dataSourceConfig.hdfs.principal</name>
-            <displayName>dataSourceConfig.hdfs.principal</displayName>
-            <value></value>
+            <name>zkStateConfig.zkRetryInterval</name>
+            <displayName>zookeeper connection retry interval</displayName>
+            <description>retry interval for zookeeper connection</description>
+            <value>20000</value>
         </property>
+
+        <!-- datasource config -->
         <property>
-            <name>dataSourceConfig.rmUrl</name>
-            <displayName>dataSourceConfig.rmUrl</displayName>
+            <name>dataSourceConfig.rm.url</name>
+            <displayName>resource manager url</displayName>
+            <description>url to fetch finished spark job list</description>
             <value>http://sandbox.hortonworks.com:8088</value>
+            <required>true</required>
         </property>
         <property>
-            <name>storm.pendingSpout</name>
-            <displayName>pendingSpout</displayName>
-            <value>1000</value>
-        </property>
-        <property>
-            <name>storm.spoutCrawlInterval</name>
-            <displayName>spoutCrawlInterval</displayName>
-            <description>Spout crawl interval (in milliseconds)</description>
-            <value>10000</value>
-        </property>
-        <property>
-            <name>storm.parallelismConfig.sparkHistoryJobFetchSpout</name>
-            <displayName>parallelismConfig.sparkHistoryJobFetchSpout</displayName>
-            <description>Parallelism of sparkHistoryJobFetchSpout </description>
-            <value>1</value>
-        </property>
-        <property>
-            <name>storm.tasks.sparkHistoryJobFetchSpout</name>
-            <displayName>tasks.sparkHistoryJobFetchSpout</displayName>
-            <description>Tasks Num of sparkHistoryJobFetchSpout </description>
-            <value>4</value>
-        </property>
-        <property>
-            <name>storm.parallelismConfig.sparkHistoryJobParseBolt</name>
-            <displayName>parallelismConfig.sparkHistoryJobParseBolt</displayName>
-            <description>Parallelism of sparkHistoryJobParseBolt </description>
-            <value>1</value>
+            <name>dataSourceConfig.hdfs.fs.defaultFS</name>
+            <displayName>hdfs url</displayName>
+            <description>target hdfs to crawl log data</description>
+            <value>hdfs://sandbox.hortonworks.com:8020</value>
+            <required>true</required>
         </property>
         <property>
-            <name>storm.tasks.sparkHistoryJobParseBolt</name>
-            <displayName>parallelismConfig.sparkHistoryJobParseBolt</displayName>
-            <description>Tasks Num of sparkHistoryJobParseBolt</description>
-            <value>4</value>
+            <name>dataSourceConfig.baseDir</name>
+            <displayName>hdfs base path for spark job data</displayName>
+            <description>hdfs base path for spark job data</description>
+            <value>/spark-history</value>
+            <required>true</required>
         </property>
+
         <property>
             <name>spark.defaultVal.spark.executor.memory</name>
             <displayName>spark.executor.memory</displayName>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 10da2d0..5f3fdac 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -15,22 +15,19 @@
 
 
 {
+  "siteId": "sandbox",
   "appId": "sparkHistoryJob",
   "mode": "CLUSTER",
   "workers" : 3,
   topology.message.timeout.secs: 300,
-  "basic":{
-    "cluster":"sandbox",
-    "dataCenter":"sandbox",
-    jobConf.additional.info: ""
-  },
-  "eagleProps":{
-    eagle.service.host:"sandbox.hortonworks.com",
-    eagle.service.port: 9099,
-    eagle.service.username: "admin",
-    eagle.service.password : "secret",
-    eagle.service.basePath : "/rest",
-    eagle.service.read.timeout : 2
+
+  "service":{
+    host:"sandbox.hortonworks.com",
+    port: 9099,
+    username: "admin",
+    password : "secret",
+    basePath : "/rest",
+    readTimeOutSeconds : 2
   },
   "zkStateConfig" : {
     "zkQuorum" : "sandbox.hortonworks.com:2181",
@@ -41,12 +38,9 @@
   },
 
   "dataSourceConfig":{
-    spark.history.server.url : "http://sandbox.hortonworks.com:18080",
-    spark.history.server.username : "",
-    spark.history.server.password : "",
     rm.url: "http://sandbox.hortonworks.com:8088",
     "baseDir" : "/spark-history",
-    "hdfs": {
+    hdfs: {
       fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020",
       #if not need, then do not set
       # hdfs.kerberos.principal = ,
@@ -54,20 +48,15 @@
       # ....
     }
   },
-  "storm":{
-    "pendingSpout": 1000,
-    "spoutCrawlInterval": 10000,#in ms
-    "parallelismConfig" : {
-      "sparkHistoryJobFetchSpout" : 1,
-      "sparkHistoryJobParseBolt" : 4
-    },
-    "tasks" : {
-      "sparkHistoryJobFetchSpout" : 1,
-      "sparkHistoryJobParseBolt" : 4
-    }
+  "topology": {
+    spoutCrawlInterval: 10000, #in ms
+    numOfSpoutExecutors: 1
+    numOfSpoutTasks: 4
+    numOfParseBoltExecutors: 1
+    numOfParserBoltTasks: 4
   },
   "spark":{
-    "defaultVal":{
+    "defaultVal": {
       spark.executor.memory:"1g",
       spark.driver.memory: "1g",
       spark.driver.cores:1,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index f58a166..96416f6 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -314,6 +314,11 @@
                     <artifactId>eagle-jpm-aggregation</artifactId>
                     <version>${project.version}</version>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.eagle</groupId>
+                    <artifactId>eagle-jpm-spark-history</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
 
                 <!-- App: Hadoop Queue Running Monitoring-->
                 <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
index 8b78e66..c5bc978 100644
--- a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
+++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -26,4 +26,5 @@ org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
 org.apache.eagle.jpm.aggregation.AggregationApplicationProvider
 org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider
 org.apache.eagle.topology.TopologyCheckAppProvider
-org.apache.eagle.metric.HadoopMetricMonitorAppProdiver
\ No newline at end of file
+org.apache.eagle.metric.HadoopMetricMonitorAppProdiver
+org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f2e08ef..8f2ef44 100755
--- a/pom.xml
+++ b/pom.xml
@@ -185,7 +185,7 @@
         <spark.core.version>1.4.0</spark.core.version>
 
         <!--  Client -->
-        <kafka-client.version>0.9.0.0</kafka-client.version>
+        <!--<kafka-client.version>0.9.0.0</kafka-client.version>-->
         <!-- Reflection -->
         <reflections.version>0.9.8</reflections.version>