You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:19 UTC

[23/52] [abbrv] incubator-eagle git commit: [EAGLE-495] Convert spark history job using application framework.

[EAGLE-495] Convert spark history job using application framework.

Author: pkuwm <ih...@gmail.com>

Closes #393 from pkuwm/EAGLE-495.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4f4fd0c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4f4fd0c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4f4fd0c4

Branch: refs/heads/master
Commit: 4f4fd0c4f606a8bc8ab83c66510aca4226becef8
Parents: a10eeb7
Author: pkuwm <ih...@gmail.com>
Authored: Mon Aug 29 10:19:09 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Mon Aug 29 10:19:09 2016 +0800

----------------------------------------------------------------------
 .../jpm/spark/crawl/JHFSparkEventReader.java    |  13 +-
 eagle-jpm/eagle-jpm-spark-history/pom.xml       |   5 +
 .../jpm/spark/history/SparkHistoryJobApp.java   |  54 ++++
 .../spark/history/SparkHistoryJobAppConfig.java | 133 +++++++++
 .../history/SparkHistoryJobAppProvider.java     |  27 ++
 .../jpm/spark/history/SparkHistoryJobMain.java  |  25 ++
 .../history/config/SparkHistoryCrawlConfig.java | 123 ---------
 .../status/JobHistoryZKStateManager.java        |   6 +-
 .../history/storm/FinishedSparkJobSpout.java    | 154 -----------
 .../history/storm/SparkHistoryJobParseBolt.java | 201 ++++++++++++++
 .../history/storm/SparkHistoryJobSpout.java     | 154 +++++++++++
 .../history/storm/SparkHistoryTopology.java     |  81 ------
 .../spark/history/storm/SparkJobParseBolt.java  | 201 --------------
 .../eagle/jpm/spark/history/storm/TestHDFS.java |  47 ----
 ...spark.history.SparkHistoryJobAppProvider.xml | 271 +++++++++++++++++++
 ...org.apache.eagle.app.spi.ApplicationProvider |  16 ++
 .../src/main/resources/application.conf         |  34 +--
 .../java/SparkHistoryJobAppProviderTest.java    |  32 +++
 .../running/SparkRunningJobAppProvider.java     |   2 +-
 19 files changed, 947 insertions(+), 632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index edb3854..1cd5a77 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.eagle.jpm.spark.crawl;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.eagle.jpm.spark.entity.*;
 import org.apache.eagle.jpm.util.*;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
@@ -109,12 +110,12 @@ public class JHFSparkEventReader {
         app.setConfig(new JobConfig());
         JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
 
-        List<String> jobConfs = conf.getStringList("basic.jobConf.additional.info");
+        String[] additionalJobConf = conf.getString("basic.jobConf.additional.info").split(",\\s*");
         String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
             "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
             "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
-        jobConfs.addAll(Arrays.asList(props));
-        for (String prop : jobConfs) {
+        String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, props);
+        for (String prop : jobConf) {
             if (sparkProps.containsKey(prop)) {
                 app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop));
             }
@@ -698,10 +699,10 @@ public class JHFSparkEventReader {
     private EagleServiceBaseClient initiateClient() {
         String host = conf.getString("eagleProps.eagle.service.host");
         int port = conf.getInt("eagleProps.eagle.service.port");
-        String userName = conf.getString("eagleProps.eagle.service.userName");
-        String pwd = conf.getString("eagleProps.eagle.service.pwd");
+        String userName = conf.getString("eagleProps.eagle.service.username");
+        String pwd = conf.getString("eagleProps.eagle.service.password");
         client = new EagleServiceClientImpl(host, port, userName, pwd);
-        int timeout = conf.getInt("eagleProps.eagle.service.read_timeout");
+        int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
         client.getJerseyClient().setReadTimeout(timeout * 1000);
 
         return client;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/pom.xml b/eagle-jpm/eagle-jpm-spark-history/pom.xml
index e144117..1c9c8b4 100644
--- a/eagle-jpm/eagle-jpm-spark-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/pom.xml
@@ -100,6 +100,11 @@
             <artifactId>hadoop-mapreduce-client-core</artifactId>
             <version>${hadoop.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <resources>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
new file mode 100644
index 0000000..180b1e8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.spark.history.storm.SparkHistoryJobSpout;
+import org.apache.eagle.jpm.spark.history.storm.SparkHistoryJobParseBolt;
+
+public class SparkHistoryJobApp extends StormApplication {
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        // 1. Init conf
+        SparkHistoryJobAppConfig sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.getInstance(config);
+
+        final String jobFetchSpoutName = SparkHistoryJobAppConfig.SPARK_HISTORY_JOB_FETCH_SPOUT_NAME;
+        final String jobParseBoltName = SparkHistoryJobAppConfig.SPARK_HISTORY_JOB_PARSE_BOLT_NAME;
+
+        // 2. Config topology.
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        config = sparkHistoryJobAppConfig.getConfig();
+        topologyBuilder.setSpout(
+                jobFetchSpoutName,
+                new SparkHistoryJobSpout(sparkHistoryJobAppConfig),
+                config.getInt("storm.parallelismConfig." + jobFetchSpoutName)
+        ).setNumTasks(config.getInt("storm.tasks." + jobFetchSpoutName));
+
+        topologyBuilder.setBolt(
+                jobParseBoltName,
+                new SparkHistoryJobParseBolt(sparkHistoryJobAppConfig),
+                config.getInt("storm.parallelismConfig." + jobParseBoltName)
+        ).setNumTasks(config.getInt("storm.tasks." + jobParseBoltName)).shuffleGrouping(jobFetchSpoutName);
+
+        return topologyBuilder.createTopology();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
new file mode 100644
index 0000000..ed499db
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -0,0 +1,133 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.io.Serializable;
+
+public class SparkHistoryJobAppConfig implements Serializable {
+    final static String SPARK_HISTORY_JOB_FETCH_SPOUT_NAME = "sparkHistoryJobFetchSpout";
+    final static String SPARK_HISTORY_JOB_PARSE_BOLT_NAME = "sparkHistoryJobParseBolt";
+
+    public ZKStateConfig zkStateConfig;
+    public JobHistoryEndpointConfig jobHistoryConfig;
+    public HDFSConfig hdfsConfig;
+    public BasicInfo info;
+    public EagleInfo eagleInfo;
+    public StormConfig stormConfig;
+
+    private Config config;
+
+    private static SparkHistoryJobAppConfig manager = new SparkHistoryJobAppConfig();
+    
+    public Config getConfig() {
+        return config;
+    }
+
+    public SparkHistoryJobAppConfig() {
+        this.zkStateConfig = new ZKStateConfig();
+        this.jobHistoryConfig = new JobHistoryEndpointConfig();
+        this.hdfsConfig = new HDFSConfig();
+        this.info = new BasicInfo();
+        this.eagleInfo = new EagleInfo();
+        this.stormConfig = new StormConfig();
+    }
+
+    public static SparkHistoryJobAppConfig getInstance(Config config) {
+        manager.init(config);
+        return manager;
+    }
+
+    private void init(Config config) {
+        this.config = config;
+
+        this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+        this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+        this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+
+        jobHistoryConfig.historyServerUrl = config.getString("dataSourceConfig.spark.history.server.url");
+        jobHistoryConfig.historyServerUserName = config.getString("dataSourceConfig.spark.history.server.username");
+        jobHistoryConfig.historyServerUserPwd = config.getString("dataSourceConfig.spark.history.server.password");
+        jobHistoryConfig.rms = config.getString("dataSourceConfig.rm.url").split(",\\s*");
+
+        this.hdfsConfig.baseDir = config.getString("dataSourceConfig.hdfs.eventLog");
+        this.hdfsConfig.endpoint = config.getString("dataSourceConfig.hdfs.endPoint");
+        this.hdfsConfig.principal = config.getString("dataSourceConfig.hdfs.principal");
+        this.hdfsConfig.keytab = config.getString("dataSourceConfig.hdfs.keytab");
+
+        info.site = config.getString("basic.cluster") + "-" + config.getString("basic.dataCenter");
+        info.jobConf = config.getString("basic.jobConf.additional.info").split(",\\s*");
+
+        this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
+        this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
+
+        this.stormConfig.mode = config.getString("storm.mode");
+        this.stormConfig.topologyName = config.getString("storm.name");
+        this.stormConfig.workerNo = config.getInt("storm.worker.num");
+        this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
+        this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
+        this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval");
+    }
+
+    public static class ZKStateConfig implements Serializable {
+        public String zkQuorum;
+        public String zkRoot;
+        public int zkSessionTimeoutMs;
+        public int zkRetryTimes;
+        public int zkRetryInterval;
+    }
+
+    public static class JobHistoryEndpointConfig implements Serializable {
+        public String[] rms;
+        public String historyServerUrl;
+        public String historyServerUserName;
+        public String historyServerUserPwd;
+    }
+
+    public static class HDFSConfig implements Serializable {
+        public String endpoint;
+        public String baseDir;
+        public String principal;
+        public String keytab;
+    }
+
+    public static class BasicInfo implements Serializable {
+        public String site;
+        public String[] jobConf;
+    }
+
+    public static class StormConfig implements Serializable {
+        public String mode;
+        public int workerNo;
+        public int timeoutSec;
+        public String topologyName;
+        public int spoutPending;
+        public int spoutCrawlInterval;
+    }
+
+    public static class EagleInfo implements Serializable {
+        public String host;
+        public int port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
new file mode 100644
index 0000000..343d9c2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+public class SparkHistoryJobAppProvider extends AbstractApplicationProvider<SparkHistoryJobApp> {
+    @Override
+    public SparkHistoryJobApp getApplication() {
+        return new SparkHistoryJobApp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobMain.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobMain.java
new file mode 100644
index 0000000..e47e5b2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobMain.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.history;
+
+public class SparkHistoryJobMain {
+    public static void main(String[] args) {
+        new SparkHistoryJobApp().run(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
deleted file mode 100644
index e6cd2f6..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.config;
-
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import java.io.Serializable;
-
-public class SparkHistoryCrawlConfig implements Serializable {
-    public ZKStateConfig zkStateConfig;
-    public JobHistoryEndpointConfig jobHistoryConfig;
-    public HDFSConfig hdfsConfig;
-    public BasicInfo info;
-    public EagleInfo eagleInfo;
-    public StormConfig stormConfig;
-
-    private Config config;
-    
-    public Config getConfig() {
-        return config;
-    }
-
-    public SparkHistoryCrawlConfig() {
-        this.config = ConfigFactory.load();
-
-        this.zkStateConfig = new ZKStateConfig();
-        this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
-        this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
-        this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
-        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
-
-        this.jobHistoryConfig = new JobHistoryEndpointConfig();
-        jobHistoryConfig.historyServerUrl = config.getString("dataSourceConfig.spark.history.server.url");
-        jobHistoryConfig.historyServerUserName = config.getString("dataSourceConfig.spark.history.server.username");
-        jobHistoryConfig.historyServerUserPwd = config.getString("dataSourceConfig.spark.history.server.pwd");
-        jobHistoryConfig.rms = config.getStringList("dataSourceConfig.rm.url").toArray(new String[0]);
-
-        this.hdfsConfig = new HDFSConfig();
-        this.hdfsConfig.baseDir = config.getString("dataSourceConfig.hdfs.baseDir");
-        this.hdfsConfig.endpoint = config.getString("dataSourceConfig.hdfs.endPoint");
-        this.hdfsConfig.principal = config.getString("dataSourceConfig.hdfs.principal");
-        this.hdfsConfig.keytab = config.getString("dataSourceConfig.hdfs.keytab");
-
-        this.info = new BasicInfo();
-        info.site = String.format("%s-%s",config.getString("basic.cluster"),config.getString("basic.datacenter"));
-        info.jobConf = config.getStringList("basic.jobConf.additional.info").toArray(new String[0]);
-
-        this.eagleInfo = new EagleInfo();
-        this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
-        this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
-
-        this.stormConfig = new StormConfig();
-        this.stormConfig.mode = config.getString("storm.mode");
-        this.stormConfig.topologyName = config.getString("storm.name");
-        this.stormConfig.workerNo = config.getInt("storm.workerNo");
-        this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
-        this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
-        this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval");
-    }
-
-    public static class ZKStateConfig implements Serializable {
-        public String zkQuorum;
-        public String zkRoot;
-        public int zkSessionTimeoutMs;
-        public int zkRetryTimes;
-        public int zkRetryInterval;
-    }
-
-    public static class JobHistoryEndpointConfig implements Serializable {
-        public String[] rms;
-        public String historyServerUrl;
-        public String historyServerUserName;
-        public String historyServerUserPwd;
-    }
-
-    public static class HDFSConfig implements Serializable {
-        public String endpoint;
-        public String baseDir;
-        public String principal;
-        public String keytab;
-    }
-
-    public static class BasicInfo implements Serializable {
-        public String site;
-        public String[] jobConf;
-    }
-
-    public static class StormConfig implements Serializable {
-        public String mode;
-        public int workerNo;
-        public int timeoutSec;
-        public String topologyName;
-        public int spoutPending;
-        public int spoutCrawlInterval;
-    }
-
-    public static class EagleInfo implements Serializable {
-        public String host;
-        public int port;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 382375f..7a95e56 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -20,7 +20,7 @@
 package org.apache.eagle.jpm.spark.history.status;
 
 import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
@@ -40,7 +40,7 @@ public class JobHistoryZKStateManager {
     private CuratorFramework curator;
     private static String START_TIMESTAMP = "lastAppTime";
 
-    private CuratorFramework newCurator(SparkHistoryCrawlConfig config) throws Exception {
+    private CuratorFramework newCurator(SparkHistoryJobAppConfig config) throws Exception {
         return CuratorFrameworkFactory.newClient(
                 config.zkStateConfig.zkQuorum,
                 config.zkStateConfig.zkSessionTimeoutMs,
@@ -49,7 +49,7 @@ public class JobHistoryZKStateManager {
         );
     }
 
-    public JobHistoryZKStateManager(SparkHistoryCrawlConfig config) {
+    public JobHistoryZKStateManager(SparkHistoryJobAppConfig config) {
         this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.info.site;
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
deleted file mode 100644
index bf04b55..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.storm;
-
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
-import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FinishedSparkJobSpout extends BaseRichSpout {
-    private static final Logger LOG = LoggerFactory.getLogger(FinishedSparkJobSpout.class);
-    private SpoutOutputCollector collector;
-    private JobHistoryZKStateManager zkState;
-    private SparkHistoryCrawlConfig config;
-    private ResourceFetcher rmFetch;
-    private long lastFinishAppTime = 0;
-    private Map<String, Integer> failTimes;
-
-    private static final int FAIL_MAX_TIMES = 5;
-
-    public FinishedSparkJobSpout(SparkHistoryCrawlConfig config) {
-        this.config = config;
-    }
-
-    @Override
-    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
-        rmFetch = new RMResourceFetcher(config.jobHistoryConfig.rms);
-        this.failTimes = new HashMap<>();
-        this.collector = spoutOutputCollector;
-        this.zkState = new JobHistoryZKStateManager(config);
-        this.lastFinishAppTime = zkState.readLastFinishedTimestamp();
-        zkState.resetApplications();
-    }
-
-
-    @Override
-    public void nextTuple() {
-        LOG.info("Start to run tuple");
-        try {
-            Calendar calendar = Calendar.getInstance();
-            long fetchTime = calendar.getTimeInMillis();
-            calendar.setTimeInMillis(this.lastFinishAppTime);
-            LOG.info("Last finished time = {}", calendar.getTime());
-            if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) {
-                List<AppInfo> appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, Long.toString(lastFinishAppTime));
-                //List<AppInfo> appInfos = (null != apps ? (List<AppInfo>)apps.get(0):new ArrayList<AppInfo>());
-                if (appInfos != null) {
-                    LOG.info("Get " + appInfos.size() + " from yarn resource manager.");
-                    for (AppInfo app : appInfos) {
-                        String appId = app.getId();
-                        if (!zkState.hasApplication(appId)) {
-                            zkState.addFinishedApplication(appId, app.getQueue(), app.getState(), app.getFinalStatus(), app.getUser(), app.getName());
-                        }
-                    }
-                }
-                this.lastFinishAppTime = fetchTime;
-                zkState.updateLastUpdateTime(fetchTime);
-            }
-
-            List<String> appIds = zkState.loadApplications(10);
-            for (String appId: appIds) {
-                collector.emit(new Values(appId), appId);
-                LOG.info("emit " + appId);
-                zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
-            }
-            LOG.info("{} apps sent.", appIds.size());
-
-            if (appIds.isEmpty()) {
-                this.takeRest(60);
-            }
-        } catch (Exception e) {
-            LOG.error("Fail to run next tuple", e);
-        }
-    }
-
-    private void takeRest(int seconds) {
-        try {
-            Thread.sleep(seconds * 1000);
-        } catch (InterruptedException e) {
-            LOG.warn("exception found {}", e);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-        outputFieldsDeclarer.declare(new Fields("appId"));
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        String appId = (String) msgId;
-        int failTimes = 0;
-        if (this.failTimes.containsKey(appId)) {
-            failTimes = this.failTimes.get(appId);
-        }
-        failTimes++;
-        if (failTimes >= FAIL_MAX_TIMES) {
-            this.failTimes.remove(appId);
-            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
-            LOG.error(String.format("Application %s has failed for over %s times, drop it.", appId, FAIL_MAX_TIMES));
-        } else {
-            this.failTimes.put(appId, failTimes);
-            collector.emit(new Values(appId), appId);
-            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
-        }
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        String appId = (String) msgId;
-        if (this.failTimes.containsKey(appId)) {
-            this.failTimes.remove(appId);
-        }
-
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        zkState.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
new file mode 100644
index 0000000..e88c62f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -0,0 +1,201 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
+import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.SparkHistoryServerResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.SparkApplication;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class SparkHistoryJobParseBolt extends BaseRichBolt {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobParseBolt.class);
+
+    private OutputCollector collector;
+    private ResourceFetcher historyServerFetcher;
+    private SparkHistoryJobAppConfig config;
+    private JobHistoryZKStateManager zkState;
+    private Configuration hdfsConf;
+
+    public SparkHistoryJobParseBolt(SparkHistoryJobAppConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        this.collector = outputCollector;
+        this.hdfsConf = new Configuration();
+        this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint);
+        this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+        this.hdfsConf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
+        this.hdfsConf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
+        this.historyServerFetcher = new SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
+                config.jobHistoryConfig.historyServerUserName, config.jobHistoryConfig.historyServerUserPwd);
+        this.zkState = new JobHistoryZKStateManager(config);
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        String appId = tuple.getStringByField("appId");
+        if (!zkState.hasApplication(appId)) {
+            //may already be processed due to some reason
+            collector.ack(tuple);
+            return;
+        }
+
+        try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) {
+            SparkApplicationInfo info = zkState.getApplicationInfo(appId);
+            //first try to get attempts under the application
+
+            Set<String> inprogressSet = new HashSet<String>();
+            List<String> attemptLogNames = this.getAttemptLogNameList(appId, hdfs, inprogressSet);
+
+            if (attemptLogNames.isEmpty()) {
+                LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not found on history server.",
+                    appId, info.getName(), info.getUser(), info.getQueue());
+            } else {
+                for (String attemptLogName : attemptLogNames) {
+                    String extension = "";
+                    if (inprogressSet.contains(attemptLogName)) {
+                        extension = ".inprogress";
+                    }
+                    LOG.info("Attempt log name: " + attemptLogName + extension);
+
+                    Path attemptFile = getFilePath(attemptLogName, extension);
+                    JHFInputStreamReader reader = new SparkFilesystemInputStreamReaderImpl(config.info.site, info);
+                    reader.read(hdfs.open(attemptFile));
+                }
+            }
+
+            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
+            LOG.info("Successfully parse application {}", appId);
+            collector.ack(tuple);
+        } catch (Exception e) {
+            LOG.error("Fail to process application {}", appId, e);
+            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED);
+            collector.fail(tuple);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+
+    private String getAppAttemptLogName(String appId, String attemptId) {
+        if (attemptId.equals("0")) {
+            return appId;
+        }
+        return appId + "_" + attemptId;
+    }
+
+    private Path getFilePath(String appAttemptLogName, String extension) {
+        String attemptLogDir = this.config.hdfsConfig.baseDir + "/" + appAttemptLogName + extension;
+        return new Path(attemptLogDir);
+    }
+
+    private List<String> getAttemptLogNameList(String appId, FileSystem hdfs, Set<String> inprogressSet)
+            throws IOException {
+        List<String> attempts = new ArrayList<String>();
+        SparkApplication app = null;
+        /*try {
+            List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId);
+            if (apps != null) {
+                app = (SparkApplication) apps.get(0);
+                attempts = app.getAttempts();
+            }
+        } catch (Exception e) {
+            LOG.warn("Fail to get application detail from history server for appId " + appId, e);
+        }*/
+
+
+        if (null == app) {
+            // history server may not have the info, just double check.
+            // TODO: if attemptId is not "1, 2, 3,...", we should change the logic.
+            // Use getResourceManagerVersion() to compare YARN/RM versions.
+            // attemptId might be: "appId_000001"
+            int attemptId = 0;
+
+            boolean exists = true;
+            while (exists) {
+                // For Yarn version 2.4.x
+                // log name: application_1464382345557_269065_1
+                String attemptIdString = Integer.toString(attemptId);
+
+                // For Yarn version >= 2.7,
+                // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
+//                String attemptIdFormatted = String.format("%06d", attemptId);
+//
+//                // remove "application_" to get the number part of appID.
+//                String sparkAppIdNum = appId.substring(12);
+//                String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
+
+                String appAttemptLogName = this.getAppAttemptLogName(appId, attemptIdString);
+                LOG.info("Attempt ID: {}, App Attempt Log: {}", attemptIdString, appAttemptLogName);
+
+                String extension = "";
+                Path attemptFile = getFilePath(appAttemptLogName, extension);
+                extension = ".inprogress";
+                Path inprogressFile = getFilePath(appAttemptLogName, extension);
+                Path logFile = null;
+                // Check if history log exists.
+                if (hdfs.exists(attemptFile)) {
+                    logFile = attemptFile;
+                } else if (hdfs.exists(inprogressFile)) {
+                    logFile = inprogressFile;
+                    inprogressSet.add(appAttemptLogName);
+                } else if (attemptId > 0) {
+                    exists = false;
+                }
+
+                if (logFile != null) {
+                    attempts.add(appAttemptLogName);
+                }
+                attemptId++;
+            }
+        }
+        return attempts;
+    }
+
+    @Override
+    public void cleanup() {
+        super.cleanup();
+        zkState.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
new file mode 100644
index 0000000..db60744
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
@@ -0,0 +1,154 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SparkHistoryJobSpout extends BaseRichSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobSpout.class);
+    private SpoutOutputCollector collector;
+    private JobHistoryZKStateManager zkState;
+    private SparkHistoryJobAppConfig config;
+    private ResourceFetcher rmFetch;
+    private long lastFinishAppTime = 0;
+    private Map<String, Integer> failTimes;
+
+    private static final int FAIL_MAX_TIMES = 5;
+
+    public SparkHistoryJobSpout(SparkHistoryJobAppConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+        rmFetch = new RMResourceFetcher(config.jobHistoryConfig.rms);
+        this.failTimes = new HashMap<>();
+        this.collector = spoutOutputCollector;
+        this.zkState = new JobHistoryZKStateManager(config);
+        this.lastFinishAppTime = zkState.readLastFinishedTimestamp();
+        zkState.resetApplications();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void nextTuple() {
+        LOG.info("Start to run tuple");
+        try {
+            Calendar calendar = Calendar.getInstance();
+            long fetchTime = calendar.getTimeInMillis();
+            calendar.setTimeInMillis(this.lastFinishAppTime);
+            LOG.info("Last finished time = {}", calendar.getTime());
+            if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) {
+                List<AppInfo> appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, Long.toString(lastFinishAppTime));
+                //List<AppInfo> appInfos = (null != apps ? (List<AppInfo>)apps.get(0):new ArrayList<AppInfo>());
+                if (appInfos != null) {
+                    LOG.info("Get " + appInfos.size() + " from yarn resource manager.");
+                    for (AppInfo app : appInfos) {
+                        String appId = app.getId();
+                        if (!zkState.hasApplication(appId)) {
+                            zkState.addFinishedApplication(appId, app.getQueue(), app.getState(), app.getFinalStatus(), app.getUser(), app.getName());
+                        }
+                    }
+                }
+                this.lastFinishAppTime = fetchTime;
+                zkState.updateLastUpdateTime(fetchTime);
+            }
+
+            List<String> appIds = zkState.loadApplications(10);
+            for (String appId: appIds) {
+                collector.emit(new Values(appId), appId);
+                LOG.info("emit " + appId);
+                zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
+            }
+            LOG.info("{} apps sent.", appIds.size());
+
+            if (appIds.isEmpty()) {
+                this.takeRest(60);
+            }
+        } catch (Exception e) {
+            LOG.error("Fail to run next tuple", e);
+        }
+    }
+
+    private void takeRest(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+            LOG.warn("exception found {}", e);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields("appId"));
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        String appId = (String) msgId;
+        int failTimes = 0;
+        if (this.failTimes.containsKey(appId)) {
+            failTimes = this.failTimes.get(appId);
+        }
+        failTimes++;
+        if (failTimes >= FAIL_MAX_TIMES) {
+            this.failTimes.remove(appId);
+            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
+            LOG.error(String.format("Application %s has failed for over %s times, drop it.", appId, FAIL_MAX_TIMES));
+        } else {
+            this.failTimes.put(appId, failTimes);
+            collector.emit(new Values(appId), appId);
+            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
+        }
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        String appId = (String) msgId;
+        if (this.failTimes.containsKey(appId)) {
+            this.failTimes.remove(appId);
+        }
+
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        zkState.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
deleted file mode 100644
index 423dbef..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.storm;
-
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-
-public class SparkHistoryTopology {
-
-    private SparkHistoryCrawlConfig sparkHistoryCrawlConfig;
-
-    public SparkHistoryTopology(SparkHistoryCrawlConfig config) {
-        this.sparkHistoryCrawlConfig = config;
-    }
-
-    public TopologyBuilder getBuilder() {
-        TopologyBuilder builder = new TopologyBuilder();
-        String spoutName = "sparkHistoryJobSpout";
-        String boltName = "sparkHistoryJobBolt";
-        com.typesafe.config.Config config = this.sparkHistoryCrawlConfig.getConfig();
-        builder.setSpout(spoutName,
-                new FinishedSparkJobSpout(sparkHistoryCrawlConfig),
-                config.getInt("storm.parallelismConfig." + spoutName)
-        ).setNumTasks(config.getInt("storm.tasks." + spoutName));
-
-        builder.setBolt(boltName,
-                new SparkJobParseBolt(sparkHistoryCrawlConfig),
-                config.getInt("storm.parallelismConfig." + boltName)
-        ).setNumTasks(config.getInt("storm.tasks." + boltName)).shuffleGrouping(spoutName);
-        return builder;
-    }
-
-
-    public static void main(String[] args) {
-        try {
-            SparkHistoryCrawlConfig crawlConfig = new SparkHistoryCrawlConfig();
-
-            Config conf = new Config();
-            conf.setNumWorkers(crawlConfig.stormConfig.workerNo);
-            conf.setMessageTimeoutSecs(crawlConfig.stormConfig.timeoutSec);
-            //conf.setMaxSpoutPending(crawlConfig.stormConfig.spoutPending);
-            //conf.put(Config.TOPOLOGY_DEBUG, true);
-
-
-            if (crawlConfig.stormConfig.mode.equals("local")) {
-                LocalCluster cluster = new LocalCluster();
-                cluster.submitTopology(
-                        crawlConfig.stormConfig.topologyName,
-                        conf,
-                        new SparkHistoryTopology(crawlConfig).getBuilder().createTopology());
-            } else {
-                StormSubmitter.submitTopology(
-                        crawlConfig.stormConfig.topologyName,
-                        conf,
-                        new SparkHistoryTopology(crawlConfig).getBuilder().createTopology());
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
deleted file mode 100644
index c515d32..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.storm;
-
-import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
-import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
-import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
-import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.SparkHistoryServerResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.model.SparkApplication;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class SparkJobParseBolt extends BaseRichBolt {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SparkJobParseBolt.class);
-
-    private OutputCollector collector;
-    private ResourceFetcher historyServerFetcher;
-    private SparkHistoryCrawlConfig config;
-    private JobHistoryZKStateManager zkState;
-    private Configuration hdfsConf;
-
-    public SparkJobParseBolt(SparkHistoryCrawlConfig config) {
-        this.config = config;
-    }
-
-    @Override
-    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-        this.collector = outputCollector;
-        this.hdfsConf = new Configuration();
-        this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint);
-        this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
-        this.hdfsConf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
-        this.hdfsConf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
-        this.historyServerFetcher = new SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
-                config.jobHistoryConfig.historyServerUserName, config.jobHistoryConfig.historyServerUserPwd);
-        this.zkState = new JobHistoryZKStateManager(config);
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        String appId = tuple.getStringByField("appId");
-        if (!zkState.hasApplication(appId)) {
-            //may already be processed due to some reason
-            collector.ack(tuple);
-            return;
-        }
-
-        try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) {
-            SparkApplicationInfo info = zkState.getApplicationInfo(appId);
-            //first try to get attempts under the application
-
-            Set<String> inprogressSet = new HashSet<String>();
-            List<String> attemptLogNames = this.getAttemptLogNameList(appId, hdfs, inprogressSet);
-
-            if (attemptLogNames.isEmpty()) {
-                LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not found on history server.",
-                    appId, info.getName(), info.getUser(), info.getQueue());
-            } else {
-                for (String attemptLogName : attemptLogNames) {
-                    String extension = "";
-                    if (inprogressSet.contains(attemptLogName)) {
-                        extension = ".inprogress";
-                    }
-                    LOG.info("Attempt log name: " + attemptLogName + extension);
-
-                    Path attemptFile = getFilePath(attemptLogName, extension);
-                    JHFInputStreamReader reader = new SparkFilesystemInputStreamReaderImpl(config.info.site, info);
-                    reader.read(hdfs.open(attemptFile));
-                }
-            }
-
-            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
-            LOG.info("Successfully parse application {}", appId);
-            collector.ack(tuple);
-        } catch (Exception e) {
-            LOG.error("Fail to process application {}", appId, e);
-            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED);
-            collector.fail(tuple);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-
-    }
-
-    private String getAppAttemptLogName(String appId, String attemptId) {
-        if (attemptId.equals("0")) {
-            return appId;
-        }
-        return appId + "_" + attemptId;
-    }
-
-    private Path getFilePath(String appAttemptLogName, String extension) {
-        String attemptLogDir = this.config.hdfsConfig.baseDir + "/" + appAttemptLogName + extension;
-        return new Path(attemptLogDir);
-    }
-
-    private List<String> getAttemptLogNameList(String appId, FileSystem hdfs, Set<String> inprogressSet)
-            throws IOException {
-        List<String> attempts = new ArrayList<String>();
-        SparkApplication app = null;
-        /*try {
-            List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId);
-            if (apps != null) {
-                app = (SparkApplication) apps.get(0);
-                attempts = app.getAttempts();
-            }
-        } catch (Exception e) {
-            LOG.warn("Fail to get application detail from history server for appId " + appId, e);
-        }*/
-
-
-        if (null == app) {
-            // history server may not have the info, just double check.
-            // TODO: if attemptId is not "1, 2, 3,...", we should change the logic.
-            // Use getResourceManagerVersion() to compare YARN/RM versions.
-            // attemptId might be: "appId_000001"
-            int attemptId = 0;
-
-            boolean exists = true;
-            while (exists) {
-                // For Yarn version 2.4.x
-                // log name: application_1464382345557_269065_1
-                String attemptIdString = Integer.toString(attemptId);
-
-                // For Yarn version >= 2.7,
-                // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
-//                String attemptIdFormatted = String.format("%06d", attemptId);
-//
-//                // remove "application_" to get the number part of appID.
-//                String sparkAppIdNum = appId.substring(12);
-//                String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
-
-                String appAttemptLogName = this.getAppAttemptLogName(appId, attemptIdString);
-                LOG.info("Attempt ID: {}, App Attempt Log: {}", attemptIdString, appAttemptLogName);
-
-                String extension = "";
-                Path attemptFile = getFilePath(appAttemptLogName, extension);
-                extension = ".inprogress";
-                Path inprogressFile = getFilePath(appAttemptLogName, extension);
-                Path logFile = null;
-                // Check if history log exists.
-                if (hdfs.exists(attemptFile)) {
-                    logFile = attemptFile;
-                } else if (hdfs.exists(inprogressFile)) {
-                    logFile = inprogressFile;
-                    inprogressSet.add(appAttemptLogName);
-                } else if (attemptId > 0) {
-                    exists = false;
-                }
-
-                if (logFile != null) {
-                    attempts.add(appAttemptLogName);
-                }
-                attemptId++;
-            }
-        }
-        return attempts;
-    }
-
-    @Override
-    public void cleanup() {
-        super.cleanup();
-        zkState.close();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
deleted file mode 100644
index f4284e1..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.storm;
-
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestHDFS {
-    private static final Logger LOG = LoggerFactory.getLogger(TestHDFS.class);
-
-    public static void main(String[] args) throws Exception {
-        SparkHistoryCrawlConfig config = new SparkHistoryCrawlConfig();
-
-        Configuration conf  = new Configuration();
-        conf.set("fs.defaultFS", config.hdfsConfig.endpoint);
-        conf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
-        conf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
-
-        FileSystem hdfs = HDFSUtil.getFileSystem(conf);
-        Path path = new Path("/logs/spark-events/local-1463002514438");
-        boolean exists = hdfs.exists(path);
-        LOG.info("File exists:{}", exists);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
new file mode 100644
index 0000000..26842b8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -0,0 +1,271 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<application>
+    <type>SPARK_HISTORY_JOB_APP</type>
+    <name>Spark History Job Monitoring</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.jpm.spark.history.SparkHistoryJobApp</appClass>
+    <viewPath>/apps/jpm</viewPath>
+    <configuration>
+        <!-- org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig -->
+        <property>
+            <name>basic.cluster</name>
+            <displayName>cluster</displayName>
+            <description>Cluster Name</description>
+            <value>sandbox</value>
+        </property>
+        <property>
+            <name>basic.dataCenter</name>
+            <displayName>dataCenter</displayName>
+            <description>Data Center</description>
+            <value>sandbox</value>
+        </property>
+        <property>
+            <name>basic.jobConf.additional.info</name>
+            <displayName>jobConf.additional.info</displayName>
+            <description>Additional info in Job Configs</description>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkQuorum</name>
+            <displayName>zkQuorum</displayName>
+            <description>Zookeeper Quorum</description>
+            <value>sandbox.hortonworks.com:2181</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRoot</name>
+            <displayName>zkRoot</displayName>
+            <description>Zookeeper Root</description>
+            <value>/sparkHistoryJob</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkPort</name>
+            <displayName>zkPort</displayName>
+            <description>Zookeeper Port</description>
+            <value>2181</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkSessionTimeoutMs</name>
+            <displayName>zkSessionTimeoutMs</displayName>
+            <description>Zookeeper session timeoutMs</description>
+            <value>15000</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkRetryTimes</name>
+            <displayName>zkRetryTimes</displayName>
+            <description>zookeeperConfig.zkRetryTimes</description>
+            <value>3</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkRetryInterval</name>
+            <displayName>zkRetryInterval</displayName>
+            <description>zookeeperConfig.zkRetryInterval</description>
+            <value>20000</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.spark.history.server.url</name>
+            <displayName>spark.history.server.url</displayName>
+            <description>Spark History Server URL</description>
+            <value>http://sandbox.hortonworks.com:18080</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.spark.history.server.username</name>
+            <displayName>spark.history.server.username</displayName>
+            <description>Spark History Server Auth Username</description>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.spark.history.server.password</name>
+            <displayName>spark.history.server.password</displayName>
+            <description>Spark History Server Auth Password</description>
+            <value></value>
+        </property>
+        <property>
+            <name>eagleProps.eagle.service.host</name>
+            <description>eagleProps.eagle.service.host</description>
+            <value>sandbox.hortonworks.com</value>
+        </property>
+        <property>
+            <name>eagleProps.eagle.service.port</name>
+            <description>eagleProps.eagle.service.port</description>
+            <value>9099</value>
+        </property>
+        <property>
+            <name>eagleProps.eagle.service.username</name>
+            <description>eagleProps.eagle.service.username</description>
+            <value>admin</value>
+        </property>
+        <property>
+            <name>eagleProps.eagle.service.password</name>
+            <description>eagleProps.eagle.service.password</description>
+            <value>secret</value>
+        </property>
+        <property>
+            <name>eagleProps.eagle.service.read.timeout</name>
+            <displayName>eagleProps.eagle.service.read.timeout</displayName>
+            <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description>
+            <value>2</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.maxFlushNum</name>
+            <displayName>eagleProps.eagleService.maxFlushNum</displayName>
+            <value>500</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hdfs.eventLog</name>
+            <displayName>dataSourceConfig.hdfs.eventLog</displayName>
+            <value>/spark-history</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hdfs.endpoint</name>
+            <displayName>dataSourceConfig.hdfs.endpoint</displayName>
+            <value>hdfs://sandbox.hortonworks.com:8020</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hdfs.keytab</name>
+            <displayName>dataSourceConfig.hdfs.keytab</displayName>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hdfs.principal</name>
+            <displayName>dataSourceConfig.hdfs.principal</displayName>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.rmUrl</name>
+            <displayName>dataSourceConfig.rmUrl</displayName>
+            <value>http://sandbox.hortonworks.com:8088</value>
+        </property>
+        <property>
+            <name>storm.mode</name>
+            <displayName>mode</displayName>
+            <description>Storm Mode: local or cluster</description>
+            <value>local</value>
+        </property>
+        <property>
+            <name>storm.worker.num</name>
+            <displayName>worker.num</displayName>
+            <description>The number of workers</description>
+            <value>2</value>
+        </property>
+        <property>
+            <name>name</name>
+            <displayName>name</displayName>
+            <description>Name of the topology</description>
+            <value>sparkHistoryJob</value>
+        </property>
+        <property>
+            <name>storm.messageTimeoutSec</name>
+            <displayName>messageTimeoutSec</displayName>
+            <description>Message timeout (in seconds)</description>
+            <value>3000</value>
+        </property>
+        <property>
+            <name>storm.pendingSpout</name>
+            <displayName>pendingSpout</displayName>
+            <value>1000</value>
+        </property>
+        <property>
+            <name>storm.spoutCrawlInterval</name>
+            <displayName>spoutCrawlInterval</displayName>
+            <description>Spout crawl interval (in milliseconds)</description>
+            <value>10000</value>
+        </property>
+        <property>
+            <name>storm.parallelismConfig.sparkHistoryJobFetchSpout</name>
+            <displayName>parallelismConfig.sparkHistoryJobFetchSpout</displayName>
+            <description>Parallelism of sparkHistoryJobFetchSpout </description>
+            <value>1</value>
+        </property>
+        <property>
+            <name>storm.tasks.sparkHistoryJobFetchSpout</name>
+            <displayName>tasks.sparkHistoryJobFetchSpout</displayName>
+            <description>Tasks Num of sparkHistoryJobFetchSpout </description>
+            <value>4</value>
+        </property>
+        <property>
+            <name>storm.parallelismConfig.sparkHistoryJobParseBolt</name>
+            <displayName>parallelismConfig.sparkHistoryJobParseBolt</displayName>
+            <description>Parallelism of sparkHistoryJobParseBolt </description>
+            <value>1</value>
+        </property>
+        <property>
+            <name>storm.tasks.sparkHistoryJobParseBolt</name>
+            <displayName>parallelismConfig.sparkHistoryJobParseBolt</displayName>
+            <description>Tasks Num of sparkHistoryJobParseBolt</description>
+            <value>4</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.executor.memory</name>
+            <displayName>spark.executor.memory</displayName>
+            <value>1g</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.driver.memory</name>
+            <displayName>spark.driver.memory</displayName>
+            <value>1g</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.driver.cores</name>
+            <displayName>spark.driver.cores</displayName>
+            <value>1</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.executor.cores</name>
+            <displayName>spark.executor.cores</displayName>
+            <value>1</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.yarn.am.memory</name>
+            <displayName>spark.yarn.am.memory</displayName>
+            <value>512m</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.yarn.am.cores</name>
+            <displayName>spark.yarn.am.cores</displayName>
+            <value>1</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.yarn.executor.memoryOverhead.factor</name>
+            <displayName>spark.yarn.executor.memoryOverhead.factor</displayName>
+            <value>10</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.yarn.driver.memoryOverhead.factor</name>
+            <displayName>spark.yarn.driver.memoryOverhead.factor</displayName>
+            <value>10</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.yarn.am.memoryOverhead.factor</name>
+            <displayName>spark.yarn.am.memoryOverhead.factor</displayName>
+            <value>10</value>
+        </property>
+        <property>
+            <name>spark.defaultVal.spark.yarn.overhead.min</name>
+            <displayName>spark.yarn.overhead.min</displayName>
+            <value>384m</value>
+        </property>
+    </configuration>
+    <docs>
+        <install>
+        </install>
+        <uninstall>
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..7adb50c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 65aaa36..289c6f7 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -18,14 +18,14 @@
   "basic":{
     "cluster":"sandbox",
     "datacenter":"sandbox",
-    jobConf.additional.info: []
+    jobConf.additional.info: ""
   },
   "eagleProps":{
     eagle.service.host:"sandbox.hortonworks.com",
     eagle.service.port: 9099,
-    eagle.service.userName: "admin",
-    eagle.service.pwd : "secret",
-    eagle.service.read_timeout : 2
+    eagle.service.username: "admin",
+    eagle.service.password : "secret",
+    eagle.service.read.timeout : 2
   },
   "dataSourceConfig":{
     "zkQuorum" : "sandbox.hortonworks.com:2181",
@@ -35,29 +35,29 @@
     "zkRetryInterval" : 20000,
     spark.history.server.url : "http://sandbox.hortonworks.com:18080",
     spark.history.server.username : "",
-    spark.history.server.pwd : "",
-    rm.url:["http://sandbox.hortonworks.com:8088"] ,
+    spark.history.server.password : "",
+    rm.url: "http://sandbox.hortonworks.com:8088",
     "hdfs": {
-      "baseDir": "/spark-history",
+      "eventLog": "/spark-history",
       "endPoint": "hdfs://sandbox.hortonworks.com:8020",
       "principal": "",
       "keytab" : ""
-      }
+    }
   },
   "storm":{
+    worker.num: 2,
     "mode": "local",
-    "workerNo": 2,
-    "name":"sparkHistory",
-    "messageTimeoutSec":  3000,
+    "name":"sparkHistoryJob",
+    "messageTimeoutSec": 3000,
     "pendingSpout": 1000,
     "spoutCrawlInterval": 10000,#in ms
     "parallelismConfig" : {
-      "sparkHistoryJobSpout" : 1,
-      "sparkHistoryJobBolt" : 6
+      "sparkHistoryJobFetchSpout" : 1,
+      "sparkHistoryJobParseBolt" : 4
     },
     "tasks" : {
-      "sparkHistoryJobSpout" : 1,
-      "sparkHistoryJobBolt" : 6
+      "sparkHistoryJobFetchSpout" : 1,
+      "sparkHistoryJobParseBolt" : 4
     }
   },
   "spark":{
@@ -73,5 +73,7 @@
       spark.yarn.am.memoryOverhead.factor: 10,
       spark.yarn.overhead.min: "384m"
     }
-  }
+  },
+  "appId": "sparkHistoryJob",
+  "mode": "LOCAL"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/test/java/SparkHistoryJobAppProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/test/java/SparkHistoryJobAppProviderTest.java b/eagle-jpm/eagle-jpm-spark-history/src/test/java/SparkHistoryJobAppProviderTest.java
new file mode 100644
index 0000000..cf6e932
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/test/java/SparkHistoryJobAppProviderTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider;
+import org.junit.Test;
+
+public class SparkHistoryJobAppProviderTest extends ApplicationTestBase {
+    @Inject
+    ApplicationSimulator simulator;
+
+    @Test
+    public void testRunWithProvider(){
+        simulator.start(SparkHistoryJobAppProvider.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
index 3d20af7..dc79b15 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
@@ -23,4 +23,4 @@ public class SparkRunningJobAppProvider extends AbstractApplicationProvider<Spar
     public SparkRunningJobApp getApplication() {
         return new SparkRunningJobApp();
     }
-}
\ No newline at end of file
+}