You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/05 18:07:41 UTC

[2/8] incubator-eagle git commit: EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
new file mode 100644
index 0000000..4e6bf03
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
@@ -0,0 +1,178 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.history.crawl.SparkHistoryFileInputStreamReaderImpl;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.SparkHistoryServerResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
+import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplicationAttempt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SparkJobParseBolt extends BaseRichBolt {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SparkJobParseBolt.class);
+
+    private OutputCollector collector;
+    private ResourceFetcher historyServerFetcher;
+    private SparkHistoryCrawlConfig config;
+    private JobHistoryZKStateManager zkState;
+    private Configuration hdfsConf;
+
+    public SparkJobParseBolt(SparkHistoryCrawlConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        this.collector = outputCollector;
+        this.hdfsConf  = new Configuration();
+        this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint);
+        this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+        this.hdfsConf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
+        this.hdfsConf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
+        this.historyServerFetcher = new SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
+                config.jobHistoryConfig.historyServerUserName, config.jobHistoryConfig.historyServerUserPwd);
+        this.zkState = new JobHistoryZKStateManager(config);
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        String appId = tuple.getStringByField("appId");
+        FileSystem hdfs = null;
+        try {
+            if (!zkState.hasApplication(appId)) {
+                //may already be processed due to some reason
+                collector.ack(tuple);
+                return;
+            }
+
+            SparkApplicationInfo info = zkState.getApplicationInfo(appId);
+            //first try to get attempts under the application
+            List<SparkApplicationAttempt> attempts = this.getAttemptList(appId);
+
+            if (attempts.isEmpty()) {
+                LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not found on history server.", appId, info.getName(), info.getUser(), info.getQueue());
+            } else {
+                hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
+                for (SparkApplicationAttempt attempt : attempts) {
+                    Path attemptFile = new Path(this.config.hdfsConfig.baseDir + "/" + this.getAppAttemptLogName(appId, attempt.getAttemptId()));
+                    JHFInputStreamReader reader = new SparkHistoryFileInputStreamReaderImpl(config.info.site , info);
+                    reader.read(hdfs.open(attemptFile));
+                }
+            }
+
+            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
+            LOG.info("Successfully parse application {}", appId);
+            collector.ack(tuple);
+        } catch (Exception e) {
+            LOG.error("Fail to process application {}", appId, e);
+            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED);
+            collector.fail(tuple);
+        } finally {
+            if (null != hdfs) {
+                try {
+                    hdfs.close();
+                } catch (Exception e) {
+                    LOG.error("Fail to close hdfs");
+                }
+            }
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+
+    private String getAppAttemptLogName(String appId, String attemptId) {
+        return String.format("%s_%s", appId, attemptId);
+    }
+
+    private List<SparkApplicationAttempt> getAttemptList(String appId) throws IOException {
+        FileSystem hdfs = null;
+        List<SparkApplicationAttempt> attempts = new ArrayList<>();
+        try {
+
+            SparkApplication app = null;
+            /*try {
+                List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId);
+                if (apps != null) {
+                    app = (SparkApplication) apps.get(0);
+                    attempts = app.getAttempts();
+                }
+            } catch (Exception e) {
+                LOG.warn("Fail to get application detail from history server for appId " + appId, e);
+            }*/
+
+
+            if (null == app) {
+                //history server may not have the info, just double check
+                hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
+                Integer attemptId = 1;
+
+                boolean exists = true;
+                while (exists) {
+                    Path attemptFile = new Path(this.config.hdfsConfig.baseDir + "/" + this.getAppAttemptLogName(appId, attemptId.toString()));
+                    if (hdfs.exists(attemptFile)) {
+                        SparkApplicationAttempt attempt = new SparkApplicationAttempt();
+                        attempt.setAttemptId(attemptId.toString());
+                        attempts.add(attempt);
+                        attemptId++;
+                    } else {
+                        exists = false;
+                    }
+                }
+            }
+            return attempts;
+        } finally {
+            if (null != hdfs) {
+                hdfs.close();
+            }
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        super.cleanup();
+        zkState.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
new file mode 100644
index 0000000..eb30f5e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
@@ -0,0 +1,47 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHDFS {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestHDFS.class);
+    public static void main(String[] args) throws Exception{
+        SparkHistoryCrawlConfig config = new SparkHistoryCrawlConfig();
+
+        Configuration conf  = new Configuration();
+        conf.set("fs.defaultFS", config.hdfsConfig.endpoint);
+        conf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
+        conf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
+
+        FileSystem hdfs = HDFSUtil.getFileSystem(conf);
+        Path path = new Path("/logs/spark-events/local-1463002514438");
+        boolean exists = hdfs.exists(path);
+        LOG.info("File exists:{}", exists);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..21686a6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.hdfs.DistributedFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
+org.apache.hadoop.hdfs.web.WebHdfsFileSystem
+org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
new file mode 100644
index 0000000..36f0836
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+{
+  "basic":{
+    "cluster":"sandbox",
+    "datacenter":"sandbox",
+    jobConf.additional.info: []
+  },
+  "eagleProps":{
+    eagle.service.host:"sandbox.hortonworks.com",
+    eagle.service.port: 9099,
+    eagle.service.userName: "admin",
+    eagle.service.pwd : "secret",
+    eagle.service.read_timeout : 2
+  },
+  "dataSourceConfig":{
+    "zkQuorum" : "sandbox.hortonworks.com:2181",
+    "zkRoot" : "/sparkJobHistory",
+    "zkSessionTimeoutMs" : 15000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 20000,
+    spark.history.server.url : "http://sandbox.hortonworks.com:18080/",
+    spark.history.server.username : "",
+    spark.history.server.pwd : "",
+    rm.url:["http://sandbox.hortonworks.com:8088"] ,
+    "hdfs": {
+      "baseDir": "/logs/spark-events",
+      "endPoint": "hdfs://sandbox.hortonworks.com:8020",
+      "principal": "",
+      "keytab" : ""
+      }
+  },
+  "storm":{
+    "mode": "local",
+    "workerNo": 2,
+    "name":"sparkHistory",
+    "messageTimeoutSec":  3000,
+    "pendingSpout": 1000,
+    "spoutCrawlInterval": 10000,#in ms
+    "parallelismConfig" : {
+      "sparkHistoryJobSpout" : 1,
+      "sparkHistoryJobBolt" : 6
+    },
+    "tasks" : {
+      "sparkHistoryJobSpout" : 1,
+      "sparkHistoryJobBolt" : 6
+    }
+  },
+  "spark":{
+    "defaultVal":{
+      spark.executor.memory:"1g",
+      spark.driver.memory: "1g",
+      spark.driver.cores:1,
+      spark.executor.cores:1,
+      spark.yarn.am.memory:"512m",
+      spark.yarn.am.cores:1,
+      spark.yarn.executor.memoryOverhead.factor: 10,
+      spark.yarn.driver.memoryOverhead.factor: 10,
+      spark.yarn.am.memoryOverhead.factor: 10,
+      spark.yarn.overhead.min: "384m"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6b8c8d6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=../logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml
new file mode 100644
index 0000000..7bf90d4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.eagle</groupId>
+    <artifactId>eagle-jpm-parent</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>eagle-jpm-spark-running</artifactId>
+  <name>eagle-jpm-spark-running</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+  	<dependency>
+  		<groupId>org.slf4j</groupId>
+  		<artifactId>slf4j-api</artifactId>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.eagle</groupId>
+  		<artifactId>eagle-stream-process-api</artifactId>
+        <version>${project.version}</version>
+  	</dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-stream-process-base</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+  	<dependency>
+  		<groupId>org.apache.eagle</groupId>
+  		<artifactId>eagle-job-common</artifactId>
+  		<version>${project.version}</version>
+  	</dependency>  	  	
+  	<dependency>
+		<groupId>org.jsoup</groupId>
+		<artifactId>jsoup</artifactId>
+	</dependency>
+  	<dependency>
+  		<groupId>org.apache.storm</groupId>
+  		<artifactId>storm-core</artifactId>
+  		<exclusions>
+      		<exclusion>
+      			<groupId>ch.qos.logback</groupId>
+        		<artifactId>logback-classic</artifactId>
+      		</exclusion>
+      	</exclusions>
+  	</dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/pom.xml b/eagle-jpm/eagle-jpm-util/pom.xml
new file mode 100644
index 0000000..ed7f658
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.eagle</groupId>
+    <artifactId>eagle-jpm-parent</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>eagle-jpm-util</artifactId>
+  <name>eagle-jpm-util</name>
+  <url>http://maven.apache.org</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.googlecode.json-simple</groupId>
+      <artifactId>json-simple</artifactId>
+      <version>1.1.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${storm.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>log4j-over-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>1.9</version>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
new file mode 100644
index 0000000..0792f15
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public class Constants {
+
+    public final static String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
+    public final static String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";
+    public final static String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService";
+    public final static String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService";
+    public final static String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService";
+
+    public static final String APPLICATION_PREFIX = "application";
+    public static final String JOB_PREFIX = "job";
+    public static final String V2_APPS_URL = "ws/v1/cluster/apps";
+    public static final String ANONYMOUS_PARAMETER = "anonymous=true";
+
+    public static final String V2_APPS_RUNNING_URL = "ws/v1/cluster/apps?state=RUNNING";
+    public static final String V2_APPS_COMPLETED_URL = "ws/v1/cluster/apps?state=FINISHED";
+
+    public static final String SPARK_APPS_URL ="api/v1/applications";
+
+    public enum CompressionType {
+        GZIP, NONE
+    }
+    public enum JobState {
+        RUNNING, COMPLETED, ALL
+    }
+
+    public enum ResourceType {
+         COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
new file mode 100644
index 0000000..8adb001
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
@@ -0,0 +1,44 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+
+public class HDFSUtil {
+
+    public static FileSystem getFileSystem(Configuration conf) throws IOException {
+        HDFSUtil.login(conf);
+       return FileSystem.get(conf);
+    }
+
+    public static void login(Configuration kConfig) throws IOException {
+        if(kConfig.get("hdfs.kerberos.principal") == null || kConfig.get("hdfs.kerberos.principal").isEmpty()){
+            return;
+        }
+       kConfig.setBoolean("hadoop.security.authorization", true);
+       kConfig.set("hadoop.security.authentication", "kerberos");
+       UserGroupInformation.setConfiguration(kConfig);
+       UserGroupInformation.loginUserFromKeytab(kConfig.get("hdfs.kerberos.principal"), kConfig.get("hdfs.keytab.file"));
+     }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
new file mode 100644
index 0000000..8080147
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class JSONUtil {
+
+    public static String getString(JSONObject obj, String field) {
+        if (obj.containsKey(field)) {
+            return (String) obj.get(field);
+        }
+        return null;
+    }
+
+    public static Integer getInt(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return ((Long)obj.get(field)).intValue();
+        }
+        return null;
+    }
+
+    public static Long getLong(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return (Long)obj.get(field);
+        }
+        return null;
+    }
+
+    public static Boolean getBoolean(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return (Boolean)obj.get(field);
+        }
+        return null;
+    }
+
+    public static JSONObject getJSONObject(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return (JSONObject)obj.get(field);
+        }
+        return null;
+    }
+
+    public static JSONArray getJSONArray(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return (JSONArray)obj.get(field);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
new file mode 100644
index 0000000..c5cc82f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class JobNameNormalization {
+	private static Logger logger = LoggerFactory.getLogger(JobNameNormalization.class);
+	private static JobNameNormalization instance = new JobNameNormalization();
+	private static final String JOB_NAME_NORMALIZATION_RULES_KEY = "job.name.normalization.rules.key";
+	private static final String PARAMETERIZED_PREFIX = "\\$";
+	private static final String MULTIPLE_RULE_DILIMITER = ";";
+	/**
+	 * map from source string to target string
+	 * source string is regular expression, for example ^(.*)[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}$
+	 * target string is parameterized string, for example $1, $2
+	 */
+	private List<JobNameNormalizationRule> _rules = new ArrayList<JobNameNormalizationRule>();
+	
+	private enum NormalizationOp{
+		REPLACE("=>");
+		private String value;
+		private NormalizationOp(String value){
+			this.value = value;
+		}
+		public String toString(){
+			return value;
+		}
+	}
+	
+	static class JobNameNormalizationRule{
+		Pattern pattern;
+		NormalizationOp op;
+		String target;
+	}
+	
+	private JobNameNormalization(){
+		try{
+			// load normalization rules
+			Config conf = ConfigFactory.load();
+			String key = JOB_NAME_NORMALIZATION_RULES_KEY.toLowerCase();
+			String value = conf.getString(key);
+			if(value == null){
+				logger.info("no job name normalization rules are loaded");
+				return;
+			}
+			// multiple rules are concatenated with semicolon, i.e. ;
+			String rules[] = value.split(MULTIPLE_RULE_DILIMITER);
+			for(String rule : rules){
+				rule = rule.trim();
+				logger.info("jobNormalizationRule is loaded " + rule);
+				addRule(rule);
+			}
+		}catch(Exception ex){
+			logger.error("fail loading job name normalization rules", ex);
+			throw new RuntimeException(ex);
+		}
+	}
+	
+	public static JobNameNormalization getInstance(){
+		return instance;
+	}
+	
+	private void addRule(String rule){
+		for(NormalizationOp op : NormalizationOp.values()){
+			// split the rule to be source and target string
+			String elements[] = rule.split(op.toString());
+			if(elements == null || elements.length != 2) return;
+			JobNameNormalizationRule r = new JobNameNormalizationRule();
+			r.pattern = Pattern.compile(elements[0].trim());
+			r.op = op;
+			r.target = elements[1].trim();
+			_rules.add(r);
+			break;  //once one Op is matched, exit
+		}
+	
+	}
+	
+	public String normalize(String jobName){
+		String normalizedJobName = jobName;
+		// go through each rules and do actions
+		for(JobNameNormalizationRule rule : _rules){
+			Pattern p = rule.pattern;
+			Matcher m = p.matcher(jobName);
+			if(m.find()){
+				normalizedJobName = rule.target;
+				int c = m.groupCount();
+				for(int i=1; i<c+1; i++){
+					normalizedJobName = normalizedJobName.replaceAll(PARAMETERIZED_PREFIX+String.valueOf(i), m.group(i));
+				}
+			}
+		}
+		return normalizedJobName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
new file mode 100644
index 0000000..35014b1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public class SparkEntityConstant {
+
+    public enum SPARK_STAGE_STATUS{
+        COMPLETE, FAILED
+    }
+
+    public enum SPARK_JOB_STATUS{
+        SUCCEEDED, FAILED
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
new file mode 100644
index 0000000..1d38eea
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public enum SparkJobTagName {
+    SITE("site"),
+    SPARK_APP_ID("sprkAppId"),
+    SPARK_APP_ATTEMPT_ID("sprkAppAttemptId"),
+    SPARK_APP_NAME("sprkAppName"),
+    SPARK_APP_NORM_NAME("normSprkAppName"),
+    SPARK_JOB_ID("jobId"),
+    SPARK_SATGE_ID("stageId"),
+    SPARK_STAGE_ATTEMPT_ID("stageAttemptId"),
+    SPARK_TASK_INDEX("taskIndex"),
+    SPARK_TASK_ATTEMPT_ID("taskAttemptId"),
+    SPARK_USER("user"),
+    SPARK_QUEUE("queue"),
+    SPARK_EXECUTOR_ID("executorId");
+
+
+    private String tagName;
+    private SparkJobTagName(String tagName){
+        this.tagName = tagName;
+    }
+
+    public String toString(){
+        return this.tagName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
new file mode 100644
index 0000000..d5147b6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.jpm.util.resourceFetch;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelector;
+import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelectorImpl;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppsWrapper;
+import org.apache.eagle.jpm.util.resourceFetch.url.JobListServiceURLBuilderImpl;
+import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import org.apache.eagle.jpm.util.resourceFetch.url.SparkCompleteJobServiceURLBuilderImpl;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class RMResourceFetcher implements ResourceFetcher{
+	
+	private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class);
+	private final HAURLSelector selector;
+	private final ServiceURLBuilder jobListServiceURLBuilder;
+	private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder;
+	
+	private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+	
+	static {
+		OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+	}
+	
+	public RMResourceFetcher(String[] RMBasePaths) {
+		this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
+		this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl();
+
+		this.selector = new HAURLSelectorImpl(RMBasePaths, jobListServiceURLBuilder, Constants.CompressionType.GZIP);
+	}
+	
+	private void checkUrl() throws IOException {
+		if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), Constants.JobState.RUNNING.name()))) {
+			selector.reSelectUrl();
+		}
+	}
+	
+	private List<Object> doFetchSparkFinishApplicationsList(String lastFinishTime) throws Exception {
+		List<AppInfo> result = null;
+		InputStream is = null;
+		try {
+			checkUrl();
+			final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), lastFinishTime);
+			LOG.info("Going to call yarn api to fetch finished spark job list: " + urlString);
+			is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP);
+			final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
+			if (appWrapper != null && appWrapper.getApps() != null
+					&& appWrapper.getApps().getApp() != null) {
+				result = appWrapper.getApps().getApp();
+				return Arrays.asList((Object)result);
+			}
+			return null;
+		}finally {
+			if (is != null) { try {is.close();} catch (Exception e){} }
+		}
+	}
+	
+
+	
+	public List<Object> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception{
+		switch(resoureType) {
+			case COMPLETE_SPARK_JOB:
+				return doFetchSparkFinishApplicationsList((String)parameter[0]);
+
+			default:
+				throw new Exception("Not support ressourceType :" + resoureType);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
new file mode 100644
index 0000000..b21d030
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch;
+
+import org.apache.eagle.jpm.util.Constants;
+
+import java.util.List;
+
+public interface ResourceFetcher {
+
+	List<Object> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
new file mode 100644
index 0000000..c13bee0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
@@ -0,0 +1,81 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
+import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import org.apache.eagle.jpm.util.resourceFetch.url.SparkJobServiceURLBuilderImpl;
+import org.apache.commons.codec.binary.Base64;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class SparkHistoryServerResourceFetcher implements ResourceFetcher{
+
+    private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryServerResourceFetcher.class);
+
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    private String historyServerURL;
+    private final ServiceURLBuilder sparkDetailJobServiceURLBuilder;
+    private String auth;
+
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    public SparkHistoryServerResourceFetcher(String historyServerURL, String userName, String pwd){
+        this.historyServerURL = historyServerURL;
+        this.sparkDetailJobServiceURLBuilder = new SparkJobServiceURLBuilderImpl();
+        this.auth = "Basic " + new String(new Base64().encode(String.format("%s:%s", userName, pwd).getBytes()));;
+    }
+
+    private List<Object> doFetchSparkApplicationDetail(String appId) throws Exception {
+        InputStream is = null;
+        try {
+            final String urlString = sparkDetailJobServiceURLBuilder.build(this.historyServerURL, appId);
+            LOG.info("Going to call spark history server api to fetch spark job: " + urlString);
+            is = InputStreamUtils.getInputStream(urlString, auth, Constants.CompressionType.NONE);
+            SparkApplication app = OBJ_MAPPER.readValue(is, SparkApplication.class);
+            return Arrays.asList((Object)app);
+        } catch (FileNotFoundException e) {
+            return null;
+        } finally {
+            if (is != null) { try {is.close();} catch (Exception e) { } }
+        }
+    }
+
+    public List<Object> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception{
+        switch(resoureType) {
+            case SPARK_JOB_DETAIL:
+                return doFetchSparkApplicationDetail((String)parameter[0]);
+            default:
+                throw new Exception("Not support resourceType :" + resoureType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
new file mode 100644
index 0000000..6d3fa45
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.connection;
+
+import org.apache.eagle.jpm.util.Constants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.zip.GZIPInputStream;
+
+public class InputStreamUtils {
+
+	private static final int CONNECTION_TIMEOUT = 10 * 1000;
+	private static final int READ_TIMEOUT = 5 * 60 * 1000;
+	private static final String GZIP_HTTP_HEADER = "Accept-Encoding";
+	private static final String GZIP_COMPRESSION = "gzip";
+	
+	private static InputStream openGZIPInputStream(URL url, String auth, int timeout) throws IOException {
+		final URLConnection connection = url.openConnection();
+		connection.setConnectTimeout(CONNECTION_TIMEOUT);
+		connection.setReadTimeout(timeout);
+		connection.addRequestProperty(GZIP_HTTP_HEADER, GZIP_COMPRESSION);
+		if (null != auth){
+			connection.setRequestProperty ("Authorization", auth);
+		}
+		return new GZIPInputStream(connection.getInputStream());
+	}
+	
+	private static InputStream openInputStream(URL url, String auth, int timeout) throws IOException {
+		URLConnection connection = url.openConnection();
+		connection.setConnectTimeout(timeout);
+		if (null != auth){
+			connection.setRequestProperty ("Authorization", auth);
+		}
+
+		return connection.getInputStream();
+	}
+	
+	public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType, int timeout) throws Exception {
+		final URL url = URLConnectionUtils.getUrl(urlString);
+		if (compressionType.equals(Constants.CompressionType.GZIP)) {
+			return openGZIPInputStream(url, auth, timeout);
+		}
+		else { // CompressionType.NONE
+			return openInputStream(url, auth, timeout);
+		}
+	}
+	
+	public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType) throws Exception {
+		return getInputStream(urlString, auth, compressionType, READ_TIMEOUT);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
new file mode 100644
index 0000000..2e7b248
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.connection;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class JobUtils {
+	
+	public static String checkAndAddLastSlash(String urlBase) {
+		if (!urlBase.endsWith("/")) {
+			return urlBase + "/";
+		}
+		return urlBase;
+	}
+	
+	public static String getJobIDByAppID(String appID) {
+		if (appID.startsWith(Constants.APPLICATION_PREFIX)) {
+			return appID.replace(Constants.APPLICATION_PREFIX, Constants.JOB_PREFIX);
+		}
+		return null;
+	}
+
+	public static String getAppIDByJobID(String jobID) {
+		if (jobID.startsWith(Constants.JOB_PREFIX)) {
+			return jobID.replace(Constants.JOB_PREFIX, Constants.APPLICATION_PREFIX);
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
new file mode 100644
index 0000000..d340d7b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.connection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+
+public final class URLConnectionUtils {
+	//TODO: change some public method to private
+    private static final Logger LOG = LoggerFactory.getLogger(URLConnectionUtils.class);
+	
+	public static URLConnection getConnection(String url) throws Exception {
+		if (url.startsWith("https://")) {
+			return getHTTPSConnection(url);
+		} else if (url.startsWith("http://")) {
+			return getHTTPConnection(url);
+		}
+		throw new Exception("Invalid input argument url: " + url);
+	}
+
+	public static URLConnection getHTTPConnection(String urlString) throws Exception {
+		final URL url = new URL(urlString);
+		return url.openConnection();
+	}
+
+	public static URL getUrl(String urlString) throws Exception  {
+		if(urlString.toLowerCase().contains("https")){
+			return getHTTPSUrl(urlString);
+		}else if (urlString.toLowerCase().contains("http")) {
+			return getURL(urlString);
+		}
+		throw new Exception("Invalid input argument url: " + urlString);
+	}
+	
+	public static URL getURL(String urlString) throws MalformedURLException {
+		return new URL(urlString);
+	}
+	
+	public static URL getHTTPSUrl(String urlString) throws MalformedURLException, NoSuchAlgorithmException, KeyManagementException  {
+    	// Create a trust manager that does not validate certificate chains   
+        final TrustManager[] trustAllCerts = new TrustManager[] {new TrustAllX509TrustManager()};
+        // Install the all-trusting trust manager   
+        final SSLContext sc = SSLContext.getInstance("SSL");   
+        sc.init(null, trustAllCerts, new java.security.SecureRandom());   
+        HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());   
+        // Create all-trusting host name verifier   
+        final HostnameVerifier allHostsValid = new HostnameVerifier() {   
+            public boolean verify(String hostname, SSLSession session) {   
+                return true;   
+            }   
+        };
+        HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
+        return new URL(urlString);
+	}
+
+	public static URLConnection getHTTPSConnection(String urlString) throws IOException, KeyManagementException, NoSuchAlgorithmException  {
+       	final URL url = getHTTPSUrl(urlString);
+       	return url.openConnection();
+	}
+	
+	public static class TrustAllX509TrustManager implements X509TrustManager {
+		@Override
+		public void checkClientTrusted(
+				java.security.cert.X509Certificate[] chain, String authType)
+				throws CertificateException {
+		}
+
+		@Override
+		public void checkServerTrusted(
+				java.security.cert.X509Certificate[] chain, String authType)
+				throws CertificateException {
+		}
+
+		@Override
+		public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
new file mode 100644
index 0000000..6eea7e3
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.ha;
+
+import java.io.IOException;
+
+public interface HAURLSelector {
+	
+	boolean checkUrl(String url);
+		
+	void reSelectUrl() throws IOException;
+	
+	String getSelectedUrl();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
new file mode 100644
index 0000000..6518ca1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.ha;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+public class HAURLSelectorImpl implements HAURLSelector {
+
+	private final String[] urls;
+	private volatile String selectedUrl;
+	private final ServiceURLBuilder builder;
+	
+	private volatile boolean reselectInProgress;
+	private final Constants.CompressionType compressionType;
+	private static final long MAX_RETRY_TIME = 3;
+	private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class);
+	
+	public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType compressionType) {
+		this.urls = urls;
+		this.compressionType = compressionType;
+		this.builder = builder;
+	}
+	
+	public boolean checkUrl(String urlString) {
+		InputStream is = null;
+		try {
+			is = InputStreamUtils.getInputStream(urlString, null, compressionType);
+		}
+		catch (Exception ex) {
+			LOG.info("get inputstream from url: " + urlString + " failed. ");
+			return false;
+		}
+		finally {
+			if (is != null) { try {	is.close(); } catch (IOException e) {/*Do nothing*/} }
+		}
+		return true;
+	}
+
+	@Override
+	public String getSelectedUrl() {
+		if (selectedUrl == null) {
+			selectedUrl = urls[0];
+		}
+		return selectedUrl;
+	}
+	
+	@Override
+	public void reSelectUrl() throws IOException {
+		if (reselectInProgress) return;
+		synchronized(this) {
+			if (reselectInProgress) return;
+			reselectInProgress = true;
+			try {
+				LOG.info("Going to reselect url");
+				for (int i = 0; i < urls.length; i++) {		
+					String urlToCheck = urls[i];
+					LOG.info("Going to try url :" + urlToCheck);
+					for (int time = 0; time < MAX_RETRY_TIME; time++) {
+						if (checkUrl(builder.build(urlToCheck, Constants.JobState.RUNNING.name()))) {
+							selectedUrl = urls[i];
+							LOG.info("Successfully switch to new url : " + selectedUrl);
+							return;
+						}
+						LOG.info("try url " + urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try again. ");
+						try {
+							Thread.sleep(5 * 1000);
+						}
+						catch (InterruptedException ex) { /* Do Nothing */}
+					}
+				}
+				throw new IOException("No alive url found: "+ StringUtils.join(";", Arrays.asList(this.urls)));
+			}
+			finally {
+				reselectInProgress = false;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
new file mode 100644
index 0000000..463ce1e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppInfo {
+	String id;
+	String user;
+	String name;
+	String queue;
+	String state;
+	String finalStatus;
+	double progress;
+	String trackingUI;
+	String trackingUrl;
+	String diagnostics;
+	String clusterId;
+	String applicationType;
+	long startedTime;
+	long finishedTime;
+	long elapsedTime;
+	String amContainerLogs;
+	String amHostHttpAddress;
+	
+	public String getId() {
+		return id;
+	}
+	public void setId(String id) {
+		this.id = id;
+	}
+	public String getUser() {
+		return user;
+	}
+	public void setUser(String user) {
+		this.user = user;
+	}
+	public String getName() {
+		return name;
+	}
+	public void setName(String name) {
+		this.name = name;
+	}
+	public String getQueue() {
+		return queue;
+	}
+	public void setQueue(String queue) {
+		this.queue = queue;
+	}
+	public String getState() {
+		return state;
+	}
+	public void setState(String state) {
+		this.state = state;
+	}
+	public String getFinalStatus() {
+		return finalStatus;
+	}
+	public void setFinalStatus(String finalStatus) {
+		this.finalStatus = finalStatus;
+	}
+	public double getProgress() {
+		return progress;
+	}
+	public void setProgress(double progress) {
+		this.progress = progress;
+	}
+	public String getTrackingUI() {
+		return trackingUI;
+	}
+	public void setTrackingUI(String trackingUI) {
+		this.trackingUI = trackingUI;
+	}
+	public String getTrackingUrl() {
+		return trackingUrl;
+	}
+	public void setTrackingUrl(String trackingUrl) {
+		this.trackingUrl = trackingUrl;
+	}
+	public String getDiagnostics() {
+		return diagnostics;
+	}
+	public void setDiagnostics(String diagnostics) {
+		this.diagnostics = diagnostics;
+	}
+	public String getClusterId() {
+		return clusterId;
+	}
+	public void setClusterId(String clusterId) {
+		this.clusterId = clusterId;
+	}
+	public String getApplicationType() {
+		return applicationType;
+	}
+	public void setApplicationType(String applicationType) {
+		this.applicationType = applicationType;
+	}
+	public long getStartedTime() {
+		return startedTime;
+	}
+	public void setStartedTime(long startedTime) {
+		this.startedTime = startedTime;
+	}
+	public long getFinishedTime() {
+		return finishedTime;
+	}
+	public void setFinishedTime(long finishedTime) {
+		this.finishedTime = finishedTime;
+	}
+	public long getElapsedTime() {
+		return elapsedTime;
+	}
+	public void setElapsedTime(long elapsedTime) {
+		this.elapsedTime = elapsedTime;
+	}
+	public String getAmContainerLogs() {
+		return amContainerLogs;
+	}
+	public void setAmContainerLogs(String amContainerLogs) {
+		this.amContainerLogs = amContainerLogs;
+	}
+	public String getAmHostHttpAddress() {
+		return amHostHttpAddress;
+	}
+	public void setAmHostHttpAddress(String amHostHttpAddress) {
+		this.amHostHttpAddress = amHostHttpAddress;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
new file mode 100644
index 0000000..741fa1d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Applications {
+
+	private List<AppInfo> app;
+
+	public List<AppInfo> getApp() {
+		return app;
+	}
+
+	public void setApp(List<AppInfo> app) {
+		this.app = app;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
new file mode 100644
index 0000000..d791685
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppsWrapper {
+	
+	private Applications apps;
+
+	public Applications getApps() {
+		return apps;
+	}
+
+	public void setApps(Applications apps) {
+		this.apps = apps;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
new file mode 100644
index 0000000..5d25d84
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplication {
+    String id;
+    String name;
+    List<SparkApplicationAttempt> attempts;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public List<SparkApplicationAttempt> getAttempts() {
+        return attempts;
+    }
+
+    public void setAttempts(List<SparkApplicationAttempt> attempts) {
+        this.attempts = attempts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
new file mode 100644
index 0000000..6e91c03
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
@@ -0,0 +1,73 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplicationAttempt {
+    String attemptId;
+    String sparkUser;
+    String startTime;
+    String endTime;
+    boolean completed;
+
+    public String getAttemptId() {
+        return attemptId;
+    }
+
+    public void setAttemptId(String attemptId) {
+        this.attemptId = attemptId;
+    }
+
+    public String getSparkUser() {
+        return sparkUser;
+    }
+
+    public void setSparkUser(String sparkUser) {
+        this.sparkUser = sparkUser;
+    }
+
+    public String getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(String startTime) {
+        this.startTime = startTime;
+    }
+
+    public String getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(String endTime) {
+        this.endTime = endTime;
+    }
+
+    public boolean isCompleted() {
+        return completed;
+    }
+
+    public void setCompleted(boolean completed) {
+        this.completed = completed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
new file mode 100644
index 0000000..5508863
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplicationWrapper {
+
+    SparkApplication app;
+
+    public SparkApplication getApp() {
+        return app;
+    }
+
+    public void setApp(SparkApplication app) {
+        this.app = app;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
new file mode 100644
index 0000000..3e21e8d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.url;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class JobListServiceURLBuilderImpl implements ServiceURLBuilder {
+	
+	public String build(String ... parameters) {
+		// {rmUrl}/ws/v1/cluster/apps?state=RUNNING 
+		String jobState = parameters[1];
+		if (jobState.equals(Constants.JobState.RUNNING.name())) {
+			return parameters[0] + "/" + Constants.V2_APPS_RUNNING_URL + "&" + Constants.ANONYMOUS_PARAMETER;
+		}
+		else if (jobState.equals(Constants.JobState.COMPLETED.name())) {
+			return parameters[0] + "/" + Constants.V2_APPS_COMPLETED_URL + "&" + Constants.ANONYMOUS_PARAMETER;
+		}
+		else if (jobState.equals(Constants.JobState.ALL.name())) {
+			return parameters[0]  + "/" + Constants.V2_APPS_URL + "&" + Constants.ANONYMOUS_PARAMETER;
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
new file mode 100644
index 0000000..597e359
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.url;
+
+public interface ServiceURLBuilder {
+	String build(String... parameters);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
new file mode 100644
index 0000000..d4d235f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
@@ -0,0 +1,29 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.url;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class SparkCompleteJobServiceURLBuilderImpl implements ServiceURLBuilder {
+
+    public String build(String... parameters) {
+        return String.format("%s/%s?applicationTypes=SPARK&state=FINISHED&finishedTimeBegin=%s&%s", parameters[0], Constants.V2_APPS_URL, parameters[1], Constants.ANONYMOUS_PARAMETER);
+    }
+}