You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:06 UTC

[10/52] [abbrv] incubator-eagle git commit: [EAGLE-460] Convert MR running app with new app framework

[EAGLE-460] Convert MR running app with new app framework

https://issues.apache.org/jira/browse/EAGLE-460

Author: Hao Chen <ha...@apache.org>

Closes #381 from haoch/EAGLE-460.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b54a63e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b54a63e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b54a63e0

Branch: refs/heads/master
Commit: b54a63e0497f6dbca6203fc3024ca649bc767f25
Parents: 0b852cb
Author: Hao Chen <ha...@apache.org>
Authored: Wed Aug 24 21:25:57 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Aug 24 21:25:57 2016 +0800

----------------------------------------------------------------------
 .../queue/model/scheduler/SchedulerInfo.java    |  86 ++++-----
 .../queue/model/scheduler/SchedulerWrapper.java |  16 +-
 .../hadoop/queue/model/scheduler/User.java      |  74 ++++----
 .../storm/HadoopQueueMetricPersistBolt.java     |  31 ++--
 .../storm/HadoopQueueRunningExtractor.java      |  23 +--
 .../apache/eagle/app/jpm/JPMApplication.java    |  13 +-
 eagle-jpm/eagle-jpm-mr-running/pom.xml          |   5 +
 .../jpm/mr/running/MRRunningJobApplication.java |  77 ++++++++
 .../MRRunningJobApplicationProvider.java        |  26 +++
 .../jpm/mr/running/MRRunningJobConfig.java      | 156 +++++++++++++++++
 .../eagle/jpm/mr/running/MRRunningJobMain.java  |  75 +-------
 .../running/config/MRRunningConfigManager.java  | 151 ----------------
 .../parser/MRJobEntityCreationHandler.java      |   6 +-
 .../jpm/mr/running/parser/MRJobParser.java      |  34 ++--
 .../mr/running/recover/MRRunningJobManager.java |   8 +-
 .../running/storm/MRRunningJobFetchSpout.java   |  30 ++--
 .../mr/running/storm/MRRunningJobParseBolt.java |  34 ++--
 ....running.MRRunningJobApplicationProvider.xml | 175 +++++++++++++++++++
 ...org.apache.eagle.app.spi.ApplicationProvider |  16 ++
 .../src/main/resources/application.conf         |   8 +-
 .../MRRunningJobApplicationProviderTest.java    |  35 ++++
 .../mr/running/MRRunningJobApplicationTest.java |  27 +++
 .../service/jpm/MRJobExecutionResource.java     |  34 ++--
 .../storm/SparkRunningJobFetchSpout.java        |  14 +-
 .../running/storm/SparkRunningJobParseBolt.java |   5 +-
 25 files changed, 733 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java
index 8ed7745..8c51600 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerInfo.java
@@ -21,63 +21,63 @@ package org.apache.eagle.hadoop.queue.model.scheduler;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SchedulerInfo {
-	private String type;
-	private double capacity;
-	private double usedCapacity;
-	private double maxCapacity;
-	private String queueName;
-	private Queues queues;
+    private String type;
+    private double capacity;
+    private double usedCapacity;
+    private double maxCapacity;
+    private String queueName;
+    private Queues queues;
 
-	public Queues getQueues() {
-		return queues;
-	}
+    public Queues getQueues() {
+        return queues;
+    }
 
-	public void setQueues(Queues queues) {
-		this.queues = queues;
-	}
+    public void setQueues(Queues queues) {
+        this.queues = queues;
+    }
 
 
-	public double getUsedCapacity() {
-		return usedCapacity;
-	}
+    public double getUsedCapacity() {
+        return usedCapacity;
+    }
 
-	public void setUsedCapacity(double usedCapacity) {
-		this.usedCapacity = usedCapacity;
-	}
+    public void setUsedCapacity(double usedCapacity) {
+        this.usedCapacity = usedCapacity;
+    }
 
-	public String getType() {
-		return type;
-	}
+    public String getType() {
+        return type;
+    }
 
-	public void setType(String type) {
-		this.type = type;
-	}
+    public void setType(String type) {
+        this.type = type;
+    }
 
-	public double getCapacity() {
-		return capacity;
-	}
+    public double getCapacity() {
+        return capacity;
+    }
 
-	public void setCapacity(double capacity) {
-		this.capacity = capacity;
-	}
+    public void setCapacity(double capacity) {
+        this.capacity = capacity;
+    }
 
-	public double getMaxCapacity() {
-		return maxCapacity;
-	}
+    public double getMaxCapacity() {
+        return maxCapacity;
+    }
 
-	public void setMaxCapacity(double maxCapacity) {
-		this.maxCapacity = maxCapacity;
-	}
+    public void setMaxCapacity(double maxCapacity) {
+        this.maxCapacity = maxCapacity;
+    }
 
-	public String getQueueName() {
-		return queueName;
-	}
+    public String getQueueName() {
+        return queueName;
+    }
 
-	public void setQueueName(String queueName) {
-		this.queueName = queueName;
-	}
+    public void setQueueName(String queueName) {
+        this.queueName = queueName;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java
index f181f2f..61b3685 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/SchedulerWrapper.java
@@ -21,16 +21,16 @@ package org.apache.eagle.hadoop.queue.model.scheduler;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SchedulerWrapper {
-	public Scheduler getScheduler() {
-		return scheduler;
-	}
+    public Scheduler getScheduler() {
+        return scheduler;
+    }
 
-	public void setScheduler(Scheduler scheduler) {
-		this.scheduler = scheduler;
-	}
+    public void setScheduler(Scheduler scheduler) {
+        this.scheduler = scheduler;
+    }
 
-	private Scheduler scheduler;
+    private Scheduler scheduler;
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java
index bac9eb2..7bddf71 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/User.java
@@ -21,43 +21,43 @@ package org.apache.eagle.hadoop.queue.model.scheduler;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class User {
-	private String username;
-	private ResourcesUsed resourcesUsed;
-	private int numPendingApplications;
-	private int numActiveApplications;
-
-	public String getUsername() {
-		return username;
-	}
-
-	public void setUsername(String username) {
-		this.username = username;
-	}
-
-	public ResourcesUsed getResourcesUsed() {
-		return resourcesUsed;
-	}
-
-	public void setResourcesUsed(ResourcesUsed resourcesUsed) {
-		this.resourcesUsed = resourcesUsed;
-	}
-
-	public int getNumPendingApplications() {
-		return numPendingApplications;
-	}
-
-	public void setNumPendingApplications(int numPendingApplications) {
-		this.numPendingApplications = numPendingApplications;
-	}
-
-	public int getNumActiveApplications() {
-		return numActiveApplications;
-	}
-
-	public void setNumActiveApplications(int numActiveApplications) {
-		this.numActiveApplications = numActiveApplications;
-	}
+    private String username;
+    private ResourcesUsed resourcesUsed;
+    private int numPendingApplications;
+    private int numActiveApplications;
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public ResourcesUsed getResourcesUsed() {
+        return resourcesUsed;
+    }
+
+    public void setResourcesUsed(ResourcesUsed resourcesUsed) {
+        this.resourcesUsed = resourcesUsed;
+    }
+
+    public int getNumPendingApplications() {
+        return numPendingApplications;
+    }
+
+    public void setNumPendingApplications(int numPendingApplications) {
+        this.numPendingApplications = numPendingApplications;
+    }
+
+    public int getNumActiveApplications() {
+        return numActiveApplications;
+    }
+
+    public void setNumActiveApplications(int numActiveApplications) {
+        this.numActiveApplications = numActiveApplications;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index c99ea41..db61841 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -18,19 +18,20 @@
 
 package org.apache.eagle.hadoop.queue.storm;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +40,7 @@ import java.util.Map;
 
 public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
 
-    private final static Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class);
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class);
 
     private Config config;
     private IEagleServiceClient client;
@@ -65,7 +66,7 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
         if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.METRIC.toString())) {
             List<GenericMetricEntity> metrics = (List<GenericMetricEntity>) data;
             writeMetrics(metrics);
-         } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
+        } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
             List<RunningQueueAPIEntity> entities = (List<RunningQueueAPIEntity>) data;
             writeEntities(entities);
         }
@@ -77,12 +78,12 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
 
     }
 
-    private void writeEntities(List<RunningQueueAPIEntity> entities){
+    private void writeEntities(List<RunningQueueAPIEntity> entities) {
         try {
             GenericServiceAPIResponseEntity response = client.create(entities);
-            if(!response.isSuccess()){
+            if (!response.isSuccess()) {
                 LOG.error("Got exception from eagle service: " + response.getException());
-            }else{
+            } else {
                 LOG.info("Successfully wrote " + entities.size() + " RunningQueueAPIEntity entities");
             }
         } catch (Exception e) {
@@ -91,12 +92,12 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
         entities.clear();
     }
 
-    private void writeMetrics(List<GenericMetricEntity> entities){
+    private void writeMetrics(List<GenericMetricEntity> entities) {
         try {
             GenericServiceAPIResponseEntity response = client.create(entities);
-            if(response.isSuccess()){
+            if (response.isSuccess()) {
                 LOG.info("Successfully wrote " + entities.size() + " GenericMetricEntity entities");
-            }else{
+            } else {
                 LOG.error(response.getException());
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
index 3c4391b..ef0c762 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
@@ -18,16 +18,17 @@
 
 package org.apache.eagle.hadoop.queue.storm;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import com.typesafe.config.Config;
-import org.apache.eagle.hadoop.queue.crawler.ClusterMetricsCrawler;
-import org.apache.eagle.hadoop.queue.crawler.RunningAppsCrawler;
-import org.apache.eagle.hadoop.queue.crawler.SchedulerInfoCrawler;
 import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
 import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
 import org.apache.eagle.hadoop.queue.common.YarnURLSelectorImpl;
+import org.apache.eagle.hadoop.queue.crawler.ClusterMetricsCrawler;
+import org.apache.eagle.hadoop.queue.crawler.RunningAppsCrawler;
+import org.apache.eagle.hadoop.queue.crawler.SchedulerInfoCrawler;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,10 +39,10 @@ import java.util.concurrent.*;
 
 public class HadoopQueueRunningExtractor {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(HadoopQueueRunningExtractor.class);
-    private final static int MAX_NUM_THREADS = 10;
-    private final static int MAX_WAIT_TIME = 10;
-    private final static String DEFAULT_SITE = "sandbox";
+    private static final Logger LOGGER = LoggerFactory.getLogger(HadoopQueueRunningExtractor.class);
+    private static final int MAX_NUM_THREADS = 10;
+    private static final int MAX_WAIT_TIME = 10;
+    private static final String DEFAULT_SITE = "sandbox";
 
     private String site;
     private String urlBases;
@@ -53,10 +54,10 @@ public class HadoopQueueRunningExtractor {
     public HadoopQueueRunningExtractor(Config eagleConf, SpoutOutputCollector collector) {
         site = HadoopYarnResourceUtils.getConfigValue(eagleConf, "eagleProps.site", DEFAULT_SITE);
         urlBases = HadoopYarnResourceUtils.getConfigValue(eagleConf, "dataSourceConfig.RMEndPoints", "");
-        if(urlBases == null){
+        if (urlBases == null) {
             throw new IllegalArgumentException(site + ".baseurl is null");
         }
-        String [] urls = urlBases.split(",");
+        String[] urls = urlBases.split(",");
         urlSelector = new YarnURLSelectorImpl(urls, Constants.CompressionType.GZIP);
         executorService = Executors.newFixedThreadPool(MAX_NUM_THREADS);
         this.collector = collector;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
index d084008..68b7eff 100644
--- a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
+++ b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
@@ -16,6 +16,9 @@
  */
 package org.apache.eagle.app.jpm;
 
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+
 import backtype.storm.generated.StormTopology;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
@@ -24,8 +27,6 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
-import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
 
 import java.util.Arrays;
 import java.util.Map;
@@ -41,16 +42,16 @@ public class JPMApplication extends StormApplication {
     }
 
     private class RandomEventSpout extends BaseRichSpout {
-        private SpoutOutputCollector _collector;
+        private SpoutOutputCollector collector;
         @Override
         public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
-            _collector = spoutOutputCollector;
+            collector = spoutOutputCollector;
         }
 
         @Override
         public void nextTuple() {
-            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
-            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+            collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+            collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml
index 414a221..d4ca4ce 100644
--- a/eagle-jpm/eagle-jpm-mr-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -80,6 +80,11 @@
             <artifactId>eagle-jpm-entity</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <resources>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
new file mode 100644
index 0000000..21ee1d9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.running;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
+import org.apache.eagle.jpm.util.Constants;
+
+import java.util.List;
+
+public class MRRunningJobApplication extends StormApplication {
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        //1. trigger init conf
+        MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.getInstance(config);
+
+        List<String> confKeyKeys = mrRunningJobConfig.getConfig().getStringList("MRConfigureKeys.jobConfigKey");
+        confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
+        confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
+        confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);
+        confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
+        confKeyKeys.add(0, mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobNameKey"));
+
+        //2. init topology
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        String spoutName = "mrRunningJobFetchSpout";
+        String boltName = "mrRunningJobParseBolt";
+        int parallelism = mrRunningJobConfig.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
+        int tasks = mrRunningJobConfig.getConfig().getInt("envContextConfig.tasks." + spoutName);
+        if (parallelism > tasks) {
+            parallelism = tasks;
+        }
+        topologyBuilder.setSpout(
+            spoutName,
+            new MRRunningJobFetchSpout(
+                mrRunningJobConfig.getJobExtractorConfig(),
+                mrRunningJobConfig.getEndpointConfig(),
+                mrRunningJobConfig.getZkStateConfig()),
+            parallelism
+        ).setNumTasks(tasks);
+
+        parallelism = mrRunningJobConfig.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
+        tasks = mrRunningJobConfig.getConfig().getInt("envContextConfig.tasks." + boltName);
+        if (parallelism > tasks) {
+            parallelism = tasks;
+        }
+        topologyBuilder.setBolt(boltName,
+            new MRRunningJobParseBolt(
+                mrRunningJobConfig.getEagleServiceConfig(),
+                mrRunningJobConfig.getEndpointConfig(),
+                mrRunningJobConfig.getJobExtractorConfig(),
+                mrRunningJobConfig.getZkStateConfig(),
+                confKeyKeys),
+            parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+        return topologyBuilder.createTopology();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
new file mode 100644
index 0000000..45a841b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.running;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+public class MRRunningJobApplicationProvider extends AbstractApplicationProvider<MRRunningJobApplication> {
+    @Override
+    public MRRunningJobApplication getApplication() {
+        return new MRRunningJobApplication();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
new file mode 100644
index 0000000..ec6740b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running;
+
+import org.apache.eagle.common.config.ConfigOptionParser;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class MRRunningJobConfig implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobConfig.class);
+
+    public String getEnv() {
+        return env;
+    }
+
+    private String env;
+
+    public ZKStateConfig getZkStateConfig() {
+        return zkStateConfig;
+    }
+
+    private ZKStateConfig zkStateConfig;
+
+    public EagleServiceConfig getEagleServiceConfig() {
+        return eagleServiceConfig;
+    }
+
+    private EagleServiceConfig eagleServiceConfig;
+
+    public JobExtractorConfig getJobExtractorConfig() {
+        return jobExtractorConfig;
+    }
+
+    private JobExtractorConfig jobExtractorConfig;
+
+    public EndpointConfig getEndpointConfig() {
+        return endpointConfig;
+    }
+
+    private EndpointConfig endpointConfig;
+
+    public static class ZKStateConfig implements Serializable {
+        public String zkQuorum;
+        public String zkRoot;
+        public int zkSessionTimeoutMs;
+        public int zkRetryTimes;
+        public int zkRetryInterval;
+        public String zkPort;
+    }
+
+    public static class EagleServiceConfig implements Serializable {
+        public String eagleServiceHost;
+        public int eagleServicePort;
+        public int readTimeoutSeconds;
+        public int maxFlushNum;
+        public String username;
+        public String password;
+    }
+
+    public static class JobExtractorConfig implements Serializable {
+        public String site;
+        public int fetchRunningJobInterval;
+        public int parseJobThreadPoolSize;
+        public int topAndBottomTaskByElapsedTime;
+    }
+
+    public static class EndpointConfig implements Serializable {
+        public String[] rmUrls;
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    private Config config;
+
+    private static MRRunningJobConfig manager = new MRRunningJobConfig();
+
+    private MRRunningJobConfig() {
+        this.eagleServiceConfig = new EagleServiceConfig();
+        this.jobExtractorConfig = new JobExtractorConfig();
+        this.endpointConfig = new EndpointConfig();
+        this.zkStateConfig = new ZKStateConfig();
+    }
+
+    public static MRRunningJobConfig getInstance(String[] args) {
+        try {
+            LOG.info("Loading from configuration file");
+            return getInstance(new ConfigOptionParser().load(args));
+        } catch (Exception e) {
+            LOG.error("failed to load config");
+            throw new IllegalArgumentException("Failed to load config", e);
+        }
+    }
+
+    public static MRRunningJobConfig getInstance(Config config) {
+        manager.init(config);
+        return manager;
+    }
+
+    private void init(Config config) {
+        this.config = config;
+        this.env = config.getString("envContextConfig.env");
+
+        //parse eagle zk
+        this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
+        this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
+        this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
+        this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+
+        // parse eagle service endpoint
+        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+        String port = config.getString("eagleProps.eagleService.port");
+        this.eagleServiceConfig.eagleServicePort = Integer.parseInt(port);
+        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+        this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
+        this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
+        //parse job extractor
+        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+        this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
+        this.jobExtractorConfig.parseJobThreadPoolSize = config.getInt("jobExtractorConfig.parseJobThreadPoolSize");
+        this.jobExtractorConfig.topAndBottomTaskByElapsedTime = config.getInt("jobExtractorConfig.topAndBottomTaskByElapsedTime");
+
+        //parse data source config
+        this.endpointConfig.rmUrls = config.getString("dataSourceConfig.rmUrls").split(",");
+
+        LOG.info("Successfully initialized MRRunningJobConfig");
+        LOG.info("env: " + this.env);
+        LOG.info("site: " + this.jobExtractorConfig.site);
+        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
index 87079fd..a3d6d74 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
@@ -18,79 +18,8 @@
 
 package org.apache.eagle.jpm.mr.running;
 
-
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
-import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
-import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
-import org.apache.eagle.jpm.util.Constants;
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import java.util.List;
-
 public class MRRunningJobMain {
     public static void main(String[] args) {
-
-        try {
-            //1. trigger init conf
-            MRRunningConfigManager mrRunningConfigManager = MRRunningConfigManager.getInstance(args);
-
-            List<String> confKeyKeys = mrRunningConfigManager.getConfig().getStringList("MRConfigureKeys.jobConfigKey");
-            confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
-            confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
-            confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);
-            confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
-            confKeyKeys.add(0, mrRunningConfigManager.getConfig().getString("MRConfigureKeys.jobNameKey"));
-
-            //2. init topology
-            TopologyBuilder topologyBuilder = new TopologyBuilder();
-            String topologyName = mrRunningConfigManager.getConfig().getString("envContextConfig.topologyName");
-            String spoutName = "mrRunningJobFetchSpout";
-            String boltName = "mrRunningJobParseBolt";
-            int parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
-            int tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
-            if (parallelism > tasks) {
-                parallelism = tasks;
-            }
-            topologyBuilder.setSpout(
-                    spoutName,
-                    new MRRunningJobFetchSpout(
-                            mrRunningConfigManager.getJobExtractorConfig(),
-                            mrRunningConfigManager.getEndpointConfig(),
-                            mrRunningConfigManager.getZkStateConfig()),
-                    parallelism
-            ).setNumTasks(tasks);
-
-            parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
-            tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName);
-            if (parallelism > tasks) {
-                parallelism = tasks;
-            }
-            topologyBuilder.setBolt(boltName,
-                    new MRRunningJobParseBolt(
-                            mrRunningConfigManager.getEagleServiceConfig(),
-                            mrRunningConfigManager.getEndpointConfig(),
-                            mrRunningConfigManager.getJobExtractorConfig(),
-                            mrRunningConfigManager.getZkStateConfig(),
-                            confKeyKeys),
-                    parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
-
-            backtype.storm.Config config = new backtype.storm.Config();
-            config.setNumWorkers(mrRunningConfigManager.getConfig().getInt("envContextConfig.workers"));
-            config.put(Config.TOPOLOGY_DEBUG, true);
-            if (!mrRunningConfigManager.getEnv().equals("local")) {
-                //cluster mode
-                //parse conf here
-                StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
-            } else {
-                //local mode
-                LocalCluster cluster = new LocalCluster();
-                cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        new MRRunningJobApplication().run(args);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
deleted file mode 100644
index 42426e4..0000000
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.running.config;
-
-import org.apache.eagle.common.config.ConfigOptionParser;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-
-public class MRRunningConfigManager implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(MRRunningConfigManager.class);
-
-    public String getEnv() {
-        return env;
-    }
-
-    private String env;
-
-    public ZKStateConfig getZkStateConfig() {
-        return zkStateConfig;
-    }
-
-    private ZKStateConfig zkStateConfig;
-
-    public EagleServiceConfig getEagleServiceConfig() {
-        return eagleServiceConfig;
-    }
-
-    private EagleServiceConfig eagleServiceConfig;
-
-    public JobExtractorConfig getJobExtractorConfig() {
-        return jobExtractorConfig;
-    }
-
-    private JobExtractorConfig jobExtractorConfig;
-
-    public EndpointConfig getEndpointConfig() {
-        return endpointConfig;
-    }
-
-    private EndpointConfig endpointConfig;
-
-    public static class ZKStateConfig implements Serializable {
-        public String zkQuorum;
-        public String zkRoot;
-        public int zkSessionTimeoutMs;
-        public int zkRetryTimes;
-        public int zkRetryInterval;
-        public String zkPort;
-    }
-
-    public static class EagleServiceConfig implements Serializable {
-        public String eagleServiceHost;
-        public int eagleServicePort;
-        public int readTimeoutSeconds;
-        public int maxFlushNum;
-        public String username;
-        public String password;
-    }
-
-    public static class JobExtractorConfig implements Serializable {
-        public String site;
-        public int fetchRunningJobInterval;
-        public int parseJobThreadPoolSize;
-        public int topAndBottomTaskByElapsedTime;
-    }
-
-    public static class EndpointConfig implements Serializable {
-        public String[] rmUrls;
-    }
-
-    public Config getConfig() {
-        return config;
-    }
-
-    private Config config;
-
-    private static MRRunningConfigManager manager = new MRRunningConfigManager();
-
-    private MRRunningConfigManager() {
-        this.eagleServiceConfig = new EagleServiceConfig();
-        this.jobExtractorConfig = new JobExtractorConfig();
-        this.endpointConfig = new EndpointConfig();
-        this.zkStateConfig = new ZKStateConfig();
-    }
-
-    public static MRRunningConfigManager getInstance(String[] args) {
-        manager.init(args);
-        return manager;
-    }
-
-    private void init(String[] args) {
-        try {
-            LOG.info("Loading from configuration file");
-            this.config = new ConfigOptionParser().load(args);
-        } catch (Exception e) {
-            LOG.error("failed to load config");
-        }
-
-        this.env = config.getString("envContextConfig.env");
-
-        //parse eagle zk
-        this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
-        this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
-        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
-        this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
-        this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
-
-        // parse eagle service endpoint
-        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
-        String port = config.getString("eagleProps.eagleService.port");
-        this.eagleServiceConfig.eagleServicePort = Integer.parseInt(port);
-        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
-        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
-        this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
-        this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
-        //parse job extractor
-        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
-        this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
-        this.jobExtractorConfig.parseJobThreadPoolSize = config.getInt("jobExtractorConfig.parseJobThreadPoolSize");
-        this.jobExtractorConfig.topAndBottomTaskByElapsedTime = config.getInt("jobExtractorConfig.topAndBottomTaskByElapsedTime");
-
-        //parse data source config
-        this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]);
-
-        LOG.info("Successfully initialized MRRunningConfigManager");
-        LOG.info("env: " + this.env);
-        LOG.info("site: " + this.jobExtractorConfig.site);
-        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
-        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index a43ed16..1a0fb61 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.running.parser;
 
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener;
 import org.apache.eagle.jpm.mr.running.parser.metrics.TaskExecutionMetricsCreationListener;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
@@ -37,11 +37,11 @@ public class MRJobEntityCreationHandler {
     private static final Logger LOG = LoggerFactory.getLogger(MRJobEntityCreationHandler.class);
 
     private List<TaggedLogAPIEntity> entities = new ArrayList<>();
-    private MRRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+    private MRRunningJobConfig.EagleServiceConfig eagleServiceConfig;
     private JobExecutionMetricsCreationListener jobMetricsListener;
     private TaskExecutionMetricsCreationListener taskMetricsListener;
 
-    public MRJobEntityCreationHandler(MRRunningConfigManager.EagleServiceConfig eagleServiceConfig) {
+    public MRJobEntityCreationHandler(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig) {
         this.eagleServiceConfig = eagleServiceConfig;
         jobMetricsListener = new JobExecutionMetricsCreationListener();
         taskMetricsListener = new TaskExecutionMetricsCreationListener();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 3b31d93..9148c0c 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.running.parser;
 
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
 import org.apache.eagle.jpm.mr.runningentity.JobConfig;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
@@ -77,14 +77,14 @@ public class MRJobParser implements Runnable {
     private boolean first;
     private Set<String> finishedTaskIds;
     private List<String> configKeys;
-    private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+    private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
 
     static {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
     }
 
-    public MRJobParser(MRRunningConfigManager.JobExtractorConfig jobExtractorConfig,
-                       MRRunningConfigManager.EagleServiceConfig eagleServiceConfig,
+    public MRJobParser(MRRunningJobConfig.JobExtractorConfig jobExtractorConfig,
+                       MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
                        AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
                        MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher,
                        List<String> configKeys) {
@@ -393,8 +393,8 @@ public class MRJobParser implements Runnable {
         Comparator<MRTask> byElapsedTimeDecrease = (e1, e2) -> -1 * Long.compare(e1.getElapsedTime(), e2.getElapsedTime());
         //2, get finished bottom n
         Iterator<MRTask> taskIteratorIncrease = tasks.stream()
-                .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
-                .sorted(byElapsedTimeIncrease).iterator();
+            .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
+            .sorted(byElapsedTimeIncrease).iterator();
         int i = 0;
         while (taskIteratorIncrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
             MRTask mrTask = taskIteratorIncrease.next();
@@ -405,8 +405,8 @@ public class MRJobParser implements Runnable {
         }
         //3, fetch finished top n
         Iterator<MRTask> taskIteratorDecrease = tasks.stream()
-                .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
-                .sorted(byElapsedTimeDecrease).iterator();
+            .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
+            .sorted(byElapsedTimeDecrease).iterator();
         i = 0;
         while (taskIteratorDecrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
             MRTask mrTask = taskIteratorDecrease.next();
@@ -417,8 +417,8 @@ public class MRJobParser implements Runnable {
         }
         //4, fetch running top n
         taskIteratorDecrease = tasks.stream()
-                .filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
-                .sorted(byElapsedTimeDecrease).iterator();
+            .filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
+            .sorted(byElapsedTimeDecrease).iterator();
         i = 0;
         while (taskIteratorDecrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
             MRTask mrTask = taskIteratorDecrease.next();
@@ -560,12 +560,12 @@ public class MRJobParser implements Runnable {
                     //we must flush entities before delete from zk in case of missing finish state of jobs
                     //delete from zk if needed
                     mrJobEntityMap.keySet()
-                            .stream()
-                            .filter(
-                                jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString())
-                                    || mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString()))
-                            .forEach(
-                                jobId -> this.runningJobManager.delete(app.getId(), jobId));
+                        .stream()
+                        .filter(
+                            jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString())
+                                || mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString()))
+                        .forEach(
+                            jobId -> this.runningJobManager.delete(app.getId(), jobId));
                 }
 
                 LOG.info("finish process yarn application " + app.getId());
@@ -575,4 +575,4 @@ public class MRJobParser implements Runnable {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
index 75650b7..50b4726 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
@@ -18,11 +18,13 @@
 
 package org.apache.eagle.jpm.mr.running.recover;
 
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
-import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,7 +32,7 @@ import java.util.Map;
 public class MRRunningJobManager implements Serializable {
     private RunningJobManager runningJobManager;
 
-    public MRRunningJobManager(MRRunningConfigManager.ZKStateConfig config) {
+    public MRRunningJobManager(MRRunningJobConfig.ZKStateConfig config) {
         this.runningJobManager = new RunningJobManager(config.zkQuorum,
                 config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
index ebb9144..268912c 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -18,13 +18,7 @@
 
 package org.apache.eagle.jpm.mr.running.storm;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
@@ -32,6 +26,13 @@ import org.apache.eagle.jpm.util.Utils;
 import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,17 +41,18 @@ import java.util.*;
 
 public class MRRunningJobFetchSpout extends BaseRichSpout {
     private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobFetchSpout.class);
-    private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
-    private MRRunningConfigManager.EndpointConfig endpointConfig;
-    private MRRunningConfigManager.ZKStateConfig zkStateConfig;
+    private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
+    private MRRunningJobConfig.EndpointConfig endpointConfig;
+    private MRRunningJobConfig.ZKStateConfig zkStateConfig;
     private ResourceFetcher resourceFetcher;
     private SpoutOutputCollector collector;
     private boolean init;
     private transient MRRunningJobManager runningJobManager;
     private Set<String> runningYarnApps;
-    public MRRunningJobFetchSpout(MRRunningConfigManager.JobExtractorConfig jobExtractorConfig,
-                                  MRRunningConfigManager.EndpointConfig endpointConfig,
-                                  MRRunningConfigManager.ZKStateConfig zkStateConfig) {
+
+    public MRRunningJobFetchSpout(MRRunningJobConfig.JobExtractorConfig jobExtractorConfig,
+                                  MRRunningJobConfig.EndpointConfig endpointConfig,
+                                  MRRunningJobConfig.ZKStateConfig zkStateConfig) {
         this.jobExtractorConfig = jobExtractorConfig;
         this.endpointConfig = endpointConfig;
         this.zkStateConfig = zkStateConfig;
@@ -168,4 +170,4 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
     @Override
     public void close() {
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 0dccd70..3174eb1 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -18,12 +18,7 @@
 
 package org.apache.eagle.jpm.mr.running.storm;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
@@ -31,6 +26,12 @@ import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.topology.base.BaseRichBolt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,19 +42,20 @@ import java.util.concurrent.Executors;
 public class MRRunningJobParseBolt extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobParseBolt.class);
 
-    private MRRunningConfigManager.EndpointConfig endpointConfig;
-    private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
-    private MRRunningConfigManager.ZKStateConfig zkStateConfig;
+    private MRRunningJobConfig.EndpointConfig endpointConfig;
+    private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
+    private MRRunningJobConfig.ZKStateConfig zkStateConfig;
     private ExecutorService executorService;
     private Map<String, MRJobParser> runningMRParsers;
     private transient MRRunningJobManager runningJobManager;
-    private MRRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+    private MRRunningJobConfig.EagleServiceConfig eagleServiceConfig;
     private ResourceFetcher resourceFetcher;
     private List<String> configKeys;
-    public MRRunningJobParseBolt(MRRunningConfigManager.EagleServiceConfig eagleServiceConfig,
-                                 MRRunningConfigManager.EndpointConfig endpointConfig,
-                                 MRRunningConfigManager.JobExtractorConfig jobExtractorConfig,
-                                 MRRunningConfigManager.ZKStateConfig zkStateConfig,
+
+    public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
+                                 MRRunningJobConfig.EndpointConfig endpointConfig,
+                                 MRRunningJobConfig.JobExtractorConfig jobExtractorConfig,
+                                 MRRunningJobConfig.ZKStateConfig zkStateConfig,
                                  List<String> configKeys) {
         this.eagleServiceConfig = eagleServiceConfig;
         this.endpointConfig = endpointConfig;
@@ -96,8 +98,8 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
                     LOG.info("remove parser {}", appId);
                 });
 
-        if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) ||
-                applicationParser.status() == MRJobParser.ParserStatus.FINISHED) {
+        if (appInfo.getState().equals(Constants.AppState.FINISHED.toString())
+            || applicationParser.status() == MRJobParser.ParserStatus.FINISHED) {
             applicationParser.setStatus(MRJobParser.ParserStatus.RUNNING);
             executorService.execute(applicationParser);
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
new file mode 100644
index 0000000..5f809f3
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<application>
+    <type>MR_RUNNING_JOB_APP</type>
+    <name>MR Running Job Monitoring</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.jpm.mr.running.MRRunningJobApplication</appClass>
+    <viewPath>/apps/jpm</viewPath>
+    <configuration>
+        <!-- org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig -->
+        <property>
+            <name>envContextConfig.env</name>
+            <value>local</value>
+            <displayName>Environment</displayName>
+            <description>Execution environment</description>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkQuorum</name>
+            <displayName>zkQuorum</displayName>
+            <description>Zookeeper Quorum</description>
+            <value>sandbox.hortonworks.com:2181</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkPort</name>
+            <displayName>zkPort</displayName>
+            <description>Zookeeper Port</description>
+            <value>2181</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkSessionTimeoutMs</name>
+            <displayName>zkSessionTimeoutMs</displayName>
+            <description>Zookeeper session timeoutMs</description>
+            <value>15000</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkRetryTimes</name>
+            <displayName>zkRetryTimes</displayName>
+            <description>zookeeperConfig.zkRetryTimes</description>
+            <value>3</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkRetryInterval</name>
+            <displayName>zkRetryInterval</displayName>
+            <description>zookeeperConfig.zkRetryInterval</description>
+            <value>20000</value>
+        </property>
+        <property>
+            <name>zookeeperConfig.zkRoot</name>
+            <value>/apps/mr/running</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <description>eagleProps.eagleService.host</description>
+            <value>sandbox.hortonworks.com</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <description>eagleProps.eagleService.port</description>
+            <value>9099</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <description>eagleProps.eagleService.username</description>
+            <value>admin</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.password</name>
+            <description>eagleProps.eagleService.password</description>
+            <value>secret</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.readTimeOutSeconds</name>
+            <description>eagleProps.eagleService.readTimeOutSeconds</description>
+            <value>20</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.maxFlushNum</name>
+            <description>eagleProps.eagleService.maxFlushNum</description>
+            <value>500</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.site</name>
+            <description>jobExtractorConfig.site</description>
+            <value>sandbox</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.fetchRunningJobInterval</name>
+            <description>jobExtractorConfig.fetchRunningJobInterval</description>
+            <value>15</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.parseThreadPoolSize</name>
+            <description>jobExtractorConfig.parseThreadPoolSize</description>
+            <value>5</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.topAndBottomTaskByElapsedTime</name>
+            <description>jobExtractorConfig.topAndBottomTaskByElapsedTime</description>
+            <value>5</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.rmUrls</name>
+            <description>dataSourceConfig.rmUrls</description>
+            <value>http://sandbox.hortonworks.com:50030</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.nnEndpoint</name>
+            <description>dataSourceConfig.nnEndpoint</description>
+            <value>hdfs://sandbox.hortonworks.com:8020</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.keytab</name>
+            <description>dataSourceConfig.keytab</description>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.principal</name>
+            <description>dataSourceConfig.principal</description>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.rmUrls</name>
+            <description>dataSourceConfig.rmUrls</description>
+            <value>http://sandbox.hortonworks.com:8088</value>
+        </property>
+    </configuration>
+    <docs>
+        <install>
+            # Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+            ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+            # Step 2: Set up data collector to flow data into kafka topic in
+
+            ./bin/logstash -f log_collector.conf
+
+            ## `log_collector.conf` sample as following:
+
+            input {
+
+            }
+            filter {
+
+            }
+            output{
+
+            }
+
+            # Step 3: start application
+
+            # Step 4: monitor with featured portal or alert with policies
+        </install>
+        <uninstall>
+            # Step 1: stop and uninstall application
+            # Step 2: delete kafka topic named "${site}_example_source_topic"
+            # Step 3: stop logstash
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..61ec08c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
index f15fc2d..4b6d4fe 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
@@ -14,10 +14,10 @@
 # limitations under the License.
 
 {
+  "appId":"mrRunningJob",
+  "mode":"LOCAL",
   "envContextConfig" : {
     "env" : "local",
-    "topologyName" : "mrRunningJob",
-    "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
       "mrRunningJobFetchSpout" : 1,
       "mrRunningJobParseBolt" : 10
@@ -44,11 +44,9 @@
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000
   },
-
   "dataSourceConfig" : {
-    "rmUrls": ["http://sandbox.hortonworks.com:50030"]
+    "rmUrls": "http://sandbox.hortonworks.com:50030"
   },
-
   "eagleProps" : {
     "mailHost" : "abc.com",
     "mailDebug" : "true",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProviderTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProviderTest.java
new file mode 100644
index 0000000..2b7d429
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProviderTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.running;
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.AppJUnitRunner;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(AppJUnitRunner.class)
+public class MRRunningJobApplicationProviderTest {
+    @Inject
+    private
+    ApplicationSimulator simulator;
+
+    @Test
+    public void testStartAsManagedApplication(){
+        simulator.start(MRRunningJobApplicationProvider.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
new file mode 100644
index 0000000..3ec9089
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.running;
+
+import com.typesafe.config.ConfigFactory;
+import org.junit.Test;
+
+public class MRRunningJobApplicationTest {
+    @Test
+    public void testRunApplicationWithCLI(){
+        new MRRunningJobApplication().run(ConfigFactory.load());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
index be90456..3e487ae 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
@@ -18,28 +18,30 @@
 
 package org.apache.eagle.service.jpm;
 
-import org.apache.commons.lang.time.StopWatch;
+
+import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
+
 import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.generic.GenericEntityServiceResource;
+
+import org.apache.commons.lang.time.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
-import java.util.*;
-
-import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
 
 @Path("mrJobs")
 public class MRJobExecutionResource {
     GenericEntityServiceResource resource = new GenericEntityServiceResource();
-    public final static String ELAPSEDMS = "elapsedms";
-    public final static String TOTAL_RESULTS = "totalResults";
+    public static final String ELAPSEDMS = "elapsedms";
+    public static final String TOTAL_RESULTS = "totalResults";
 
-    private final static Logger LOG = LoggerFactory.getLogger(MRJobExecutionResource.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MRJobExecutionResource.class);
 
     @GET
     @Produces(MediaType.APPLICATION_JSON)
@@ -57,7 +59,7 @@ public class MRJobExecutionResource {
         List<TaggedLogAPIEntity> jobs = new ArrayList<>();
         List<TaggedLogAPIEntity> finishedJobs = new ArrayList<>();
         Set<String> jobIds = new HashSet<>();
-        Map<String,Object> meta = new HashMap<>();
+        final Map<String,Object> meta = new HashMap<>();
         StopWatch stopWatch = new StopWatch();
 
         stopWatch.start();
@@ -134,7 +136,7 @@ public class MRJobExecutionResource {
         List<TaggedLogAPIEntity> jobs = new ArrayList<>();
         Set<String> jobIds = new HashSet<>();
         String condition = buildCondition(jobId, jobDefId, site);
-        int pageSize = Integer.MAX_VALUE;
+        final int pageSize = Integer.MAX_VALUE;
         if (condition == null) {
             response.setException(new Exception("Search condition is empty"));
             response.setSuccess(false);
@@ -142,7 +144,7 @@ public class MRJobExecutionResource {
         }
         LOG.debug("search condition=" + condition);
 
-        Map<String,Object> meta = new HashMap<>();
+        final Map<String,Object> meta = new HashMap<>();
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         String queryFormat = "%s[%s]{*}";
@@ -245,11 +247,11 @@ public class MRJobExecutionResource {
 
         List<Long> times = parseTimeList(timeList);
         String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId);
-        GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> history_res =
+        GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> historyRes =
                 resource.search(query,  null, null, Integer.MAX_VALUE, null, false, true,  0L, 0, true, 0, null, false);
-        if (history_res.isSuccess() && history_res.getObj() != null && history_res.getObj().size() > 0) {
+        if (historyRes.isSuccess() && historyRes.getObj() != null && historyRes.getObj().size() > 0) {
             initTaskCountList(runningTaskCount, finishedTaskCount, times, new HistoryTaskComparator());
-            for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : history_res.getObj()) {
+            for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : historyRes.getObj()) {
                 int index = getPosition(times, o.getDuration());
                 MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index);
                 counter.taskCount++;
@@ -257,11 +259,11 @@ public class MRJobExecutionResource {
             }
         } else {
             query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId);
-            GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> running_res =
+            GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> runningRes =
                     resource.search(query,  null, null, Integer.MAX_VALUE, null, false, true,  0L, 0, true, 0, null, false);
-            if (running_res.isSuccess() && running_res.getObj() != null) {
+            if (runningRes.isSuccess() && runningRes.getObj() != null) {
                 initTaskCountList(runningTaskCount, finishedTaskCount, times, new RunningTaskComparator());
-                for (TaskExecutionAPIEntity o : running_res.getObj()) {
+                for (TaskExecutionAPIEntity o : runningRes.getObj()) {
                     int index = getPosition(times, o.getDuration());
                     if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
                         MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
index ccdfe79..7162bac 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -18,12 +18,6 @@
 
 package org.apache.eagle.jpm.spark.running.storm;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
 import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
 import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
@@ -31,6 +25,13 @@ import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -143,6 +144,7 @@ public class SparkRunningJobFetchSpout extends BaseRichSpout {
             try {
                 Thread.sleep(jobExtractorConfig.fetchRunningJobInterval * 1000);
             } catch (Exception e) {
+                // ignored
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b54a63e0/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index c41804b..a497e29 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -53,6 +53,7 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
     private ExecutorService executorService;
     private Map<String, SparkApplicationParser> runningSparkParsers;
     private ResourceFetcher resourceFetcher;
+
     public SparkRunningJobParseBolt(SparkRunningJobAppConfig.ZKStateConfig zkStateConfig,
                                     SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig,
                                     SparkRunningJobAppConfig.EndpointConfig endpointConfig,
@@ -94,8 +95,8 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
                     LOG.info("remove parser {}", appId);
                 });
 
-        if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) ||
-                applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
+        if (appInfo.getState().equals(Constants.AppState.FINISHED.toString())
+            || applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
             applicationParser.setStatus(SparkApplicationParser.ParserStatus.RUNNING);
             executorService.execute(applicationParser);
         }