You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/09/27 03:37:17 UTC

incubator-eagle git commit: [EAGLE-564] pack mr history/running feeder in one topology and bugs fix

Repository: incubator-eagle
Updated Branches:
  refs/heads/master ad6a53490 -> 45ff4b8f5


[EAGLE-564] pack mr history/running feeder in one topology and bugs fix

https://issues.apache.org/jira/browse/EAGLE-564

Pack MR history/Running feeder in the final topology by using topology-assembly.
Changes:
1. remove useless configure
2. add application provider to topology assembly spi
Bug fix:
1. Storm configure(workers/timeout...) does not overrided by customer defined configure.
2. When install an application, user may not set jarPath, in this case, we need to dynamic load, otherwise, use the jarPath that user provides.

Author: wujinhu <wu...@126.com>

Closes #448 from wujinhu/deploy.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/45ff4b8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/45ff4b8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/45ff4b8f

Branch: refs/heads/master
Commit: 45ff4b8f58f9de4a7f4f01b452ba4ca5071cde0c
Parents: ad6a534
Author: wujinhu <wu...@126.com>
Authored: Tue Sep 27 11:37:06 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Tue Sep 27 11:37:06 2016 +0800

----------------------------------------------------------------------
 .../environment/impl/StormExecutionRuntime.java | 14 ++++++------
 .../impl/ApplicationManagementServiceImpl.java  |  5 +---
 .../app/spi/ApplicationXMLDescriptorLoader.java |  2 ++
 .../eagle/metadata/model/ApplicationDesc.java   | 13 +++++++++--
 eagle-core/eagle-query/eagle-query-base/pom.xml |  5 ++++
 .../jpm/mr/history/MRHistoryJobConfig.java      |  8 -------
 .../mr/history/parser/JHFEventReaderBase.java   |  3 ++-
 ....history.MRHistoryJobApplicationProvider.xml |  1 -
 .../src/main/resources/application.conf         |  3 ---
 .../jpm/mr/running/MRRunningJobApplication.java |  8 ++++++-
 .../jpm/mr/running/MRRunningJobConfig.java      |  8 -------
 ....running.MRRunningJobApplicationProvider.xml |  1 -
 .../src/main/resources/application.conf         | 24 ++------------------
 eagle-jpm/eagle-jpm-util/pom.xml                |  5 ++++
 .../org/apache/eagle/jpm/util/HDFSUtil.java     |  3 +++
 .../jpm/util/resourcefetch/model/MrJobs.java    |  2 +-
 eagle-topology-assembly/pom.xml                 | 15 ++++++++++++
 ...org.apache.eagle.app.spi.ApplicationProvider |  2 ++
 .../services/org.apache.hadoop.fs.FileSystem    | 20 ++++++++++++++++
 19 files changed, 83 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 1d0f812..6d12494 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -67,7 +67,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
 
     private static final String WORKERS = "workers";
 
-    private backtype.storm.Config getStormConfig() {
+    private backtype.storm.Config getStormConfig(com.typesafe.config.Config config) {
         backtype.storm.Config conf = new backtype.storm.Config();
         conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
         conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
@@ -92,12 +92,12 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
         conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
         conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "backtype.storm.security.auth.SimpleTransportPlugin");
-        if (environment.config().hasPath(WORKERS)) {
-            conf.setNumWorkers(environment.config().getInt(WORKERS));
+        if (config.hasPath(WORKERS)) {
+            conf.setNumWorkers(config.getInt(WORKERS));
         }
 
-        if (environment.config().hasPath(TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
-            conf.put(TOPOLOGY_MESSAGE_TIMEOUT_SECS, environment.config().getInt(TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+        if (config.hasPath(TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+            conf.put(TOPOLOGY_MESSAGE_TIMEOUT_SECS, config.getInt(TOPOLOGY_MESSAGE_TIMEOUT_SECS));
         }
         return conf;
     }
@@ -108,7 +108,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         Preconditions.checkNotNull(topologyName,"[appId] is required by null for " + executor.getClass().getCanonicalName());
         StormTopology topology = executor.execute(config, environment);
         LOG.info("Starting {} ({}), mode: {}",topologyName, executor.getClass().getCanonicalName(), config.getString("mode"));
-        Config conf = getStormConfig();
+        Config conf = getStormConfig(config);
         if (ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))) {
             String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
             if (jarFile == null) {
@@ -139,7 +139,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         String appId = config.getString("appId");
         LOG.info("Stopping topology {} ..." + appId);
         if (Objects.equals(config.getString("mode"), ApplicationEntity.Mode.CLUSTER.name())) {
-            Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
+            Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig(config)).getClient();
             try {
                 stormClient.killTopology(appId);
             } catch (NotAliveException | TException e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
index fd8e650..ffede54 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -64,9 +64,6 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
     public ApplicationEntity install(ApplicationOperations.InstallOperation operation) throws EntityNotFoundException {
         Preconditions.checkNotNull(operation.getSiteId(), "siteId is null");
         Preconditions.checkNotNull(operation.getAppType(), "appType is null");
-        if (operation.getMode().equals(ApplicationEntity.Mode.CLUSTER)) {
-            Preconditions.checkNotNull(operation.getJarPath(), "jarPath is null when mode is CLUSTER");
-        }
         SiteEntity siteEntity = siteEntityService.getBySiteId(operation.getSiteId());
         Preconditions.checkNotNull(siteEntity, "Site with ID: " + operation.getSiteId() + " is not found");
         ApplicationDesc appDesc = applicationProviderService.getApplicationDescByType(operation.getAppType());
@@ -75,7 +72,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         applicationEntity.setDescriptor(appDesc);
         applicationEntity.setSite(siteEntity);
         applicationEntity.setMode(operation.getMode());
-        applicationEntity.setJarPath(operation.getJarPath());
+        applicationEntity.setJarPath(operation.getJarPath() == null ? appDesc.getJarPath() : operation.getJarPath());
         applicationEntity.ensureDefault();
 
         // Calculate application config based on:

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationXMLDescriptorLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationXMLDescriptorLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationXMLDescriptorLoader.java
index 2e8dbc3..84768ae 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationXMLDescriptorLoader.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationXMLDescriptorLoader.java
@@ -18,6 +18,7 @@ package org.apache.eagle.app.spi;
 
 import org.apache.eagle.app.Application;
 import org.apache.eagle.app.config.ApplicationProviderDescConfig;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 
 /**
@@ -54,6 +55,7 @@ class ApplicationXMLDescriptorLoader implements ApplicationDescLoader {
         applicationDesc.setVersion(descWrapperConfig.getVersion());
         applicationDesc.setName(descWrapperConfig.getName());
         applicationDesc.setDocs(descWrapperConfig.getDocs());
+        applicationDesc.setJarPath(DynamicJarPathFinder.findPath(applicationClass));
         if (applicationClass != null) {
             applicationDesc.setAppClass(applicationClass);
             if (!Application.class.isAssignableFrom(applicationDesc.getAppClass())) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationDesc.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationDesc.java
index 206cf73..506bdc6 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationDesc.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationDesc.java
@@ -30,6 +30,7 @@ public class ApplicationDesc implements Serializable {
     private String version;
     private String description;
     private Class<?> appClass;
+    private String jarPath;
     private String viewPath;
     private Class<?> providerClass;
     private Configuration configuration;
@@ -50,6 +51,10 @@ public class ApplicationDesc implements Serializable {
         return type;
     }
 
+    public String getJarPath() {
+        return jarPath;
+    }
+
     public Configuration getConfiguration() {
         return configuration;
     }
@@ -66,6 +71,10 @@ public class ApplicationDesc implements Serializable {
         this.type = type;
     }
 
+    public void setJarPath(String jarPath) {
+        this.jarPath = jarPath;
+    }
+
     public void setName(String name) {
         this.name = name;
     }
@@ -100,8 +109,8 @@ public class ApplicationDesc implements Serializable {
 
     @Override
     public String toString() {
-        return String.format("ApplicationDesc [type=%s, name=%s, version=%s, appClass=%s, viewPath=%s, providerClass=%s, configuration= %s properties, description=%s",
-            getType(), getName(), getVersion(), getAppClass(), getViewPath(), getProviderClass(), getConfiguration() == null ? 0 : getConfiguration().size(), getDescription());
+        return String.format("ApplicationDesc [type=%s, name=%s, version=%s, appClass=%s, viewPath=%s, jarpath=%s, providerClass=%s, configuration= %s properties, description=%s",
+            getType(), getName(), getVersion(), getAppClass(), getViewPath(),getJarPath(), getProviderClass(), getConfiguration() == null ? 0 : getConfiguration().size(), getDescription());
     }
 
     public void setConfiguration(Configuration configuration) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-core/eagle-query/eagle-query-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/pom.xml b/eagle-core/eagle-query/eagle-query-base/pom.xml
index 4f41319..f0eda4d 100644
--- a/eagle-core/eagle-query/eagle-query-base/pom.xml
+++ b/eagle-core/eagle-query/eagle-query-base/pom.xml
@@ -39,5 +39,10 @@
             <artifactId>eagle-entity-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
index 3bb0fda..839d2e4 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -37,12 +37,6 @@ public class MRHistoryJobConfig implements Serializable {
 
     private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf";
 
-    public String getEnv() {
-        return env;
-    }
-
-    private String env;
-
     public ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
@@ -150,7 +144,6 @@ public class MRHistoryJobConfig implements Serializable {
      */
     private void init(Config config) {
         this.config = config;
-        this.env = config.getString("envContextConfig.env");
         //parse eagle job extractor
         this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
         this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion");
@@ -193,7 +186,6 @@ public class MRHistoryJobConfig implements Serializable {
         this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
 
         LOG.info("Successfully initialized MRHistoryJobConfig");
-        LOG.info("env: " + this.env);
         LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum);
         LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort);
         LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index c87f4b2..d452f59 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -443,7 +443,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
                 jobExecutionEntity.getFailedTasks().put(taskID,
                     new HashMap<String, String>() {
                         {
-                            put(entity.getTags().get(MRJobTagName.ERROR_CATEGORY.toString()), entity.getError());
+                            put(entity.getTags().get(MRJobTagName.ERROR_CATEGORY.toString()),
+                                entity.getTags().get(MRJobTagName.ERROR_CATEGORY.toString()));//decide later
                         }
                     }
                 );

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index ae318ba..02e8c05 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -20,7 +20,6 @@
     <type>MR_HISTORY_JOB_APP</type>
     <name>Map Reduce History Job Monitoring</name>
     <version>0.5.0-incubating</version>
-    <appClass>org.apache.eagle.jpm.mr.history.MRHistoryJobApplication</appClass>
     <configuration>
         <!-- org.apache.eagle.jpm.mr.history.MRHistoryJobConfig -->
         <property>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 1debd06..1440227 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -15,9 +15,6 @@
 
 {
   "envContextConfig" : {
-    "env" : "local",
-    "topologyName" : "mrHistoryJob",
-    "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
       "mrHistoryJobExecutor" : 6
     },

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index 66e0f37..e8abf30 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -26,6 +26,8 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class MRRunningJobApplication extends StormApplication {
@@ -34,7 +36,11 @@ public class MRRunningJobApplication extends StormApplication {
         //1. trigger init conf
         MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.getInstance(config);
 
-        List<String> confKeyKeys = mrRunningJobConfig.getConfig().getStringList("MRConfigureKeys.jobConfigKey");
+        String[] confKeyPatternsSplit = mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobConfigKey").split(",");
+        List<String> confKeyKeys = new ArrayList<>(confKeyPatternsSplit.length);
+        for (String confKeyPattern : confKeyPatternsSplit) {
+            confKeyKeys.add(confKeyPattern.trim());
+        }
         confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
         confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
         confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index ec6740b..93bcd0c 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -29,12 +29,6 @@ import java.io.Serializable;
 public class MRRunningJobConfig implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobConfig.class);
 
-    public String getEnv() {
-        return env;
-    }
-
-    private String env;
-
     public ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
@@ -120,7 +114,6 @@ public class MRRunningJobConfig implements Serializable {
 
     private void init(Config config) {
         this.config = config;
-        this.env = config.getString("envContextConfig.env");
 
         //parse eagle zk
         this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
@@ -148,7 +141,6 @@ public class MRRunningJobConfig implements Serializable {
         this.endpointConfig.rmUrls = config.getString("dataSourceConfig.rmUrls").split(",");
 
         LOG.info("Successfully initialized MRRunningJobConfig");
-        LOG.info("env: " + this.env);
         LOG.info("site: " + this.jobExtractorConfig.site);
         LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
         LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
index 39e562c..51e9eb8 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
@@ -20,7 +20,6 @@
     <type>MR_RUNNING_JOB_APP</type>
     <name>MR Running Job Monitoring</name>
     <version>0.5.0-incubating</version>
-    <appClass>org.apache.eagle.jpm.mr.running.MRRunningJobApplication</appClass>
     <configuration>
         <!-- org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig -->
         <property>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
index 0d1de78..830c72b 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
@@ -19,8 +19,6 @@
   application.storm.nimbusHost=localhost,
   "workers" : 8,
   "envContextConfig" : {
-    "env" : "local",
-    "topologyName" : "mrRunningJob",
     "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
       "mrRunningJobFetchSpout" : 1,
@@ -67,24 +65,6 @@
 
   "MRConfigureKeys" : {
     "jobNameKey" : "eagle.job.name",
-    "jobConfigKey" : [
-      "mapreduce.map.output.compress",
-      "mapreduce.map.output.compress.codec",
-      "mapreduce.output.fileoutputformat.compress",
-      "mapreduce.output.fileoutputformat.compress.type",
-      "mapreduce.output.fileoutputformat.compress.codec",
-      "mapred.output.format.class",
-      "eagle.job.runid",
-      "eagle.job.runidfieldname",
-      "eagle.job.name",
-      "eagle.job.normalizedfieldname",
-      "eagle.alert.email",
-      "eagle.job.alertemailaddress",
-      "dataplatform.etl.info",
-      "mapreduce.map.memory.mb",
-      "mapreduce.reduce.memory.mb",
-      "mapreduce.map.java.opts",
-      "mapreduce.reduce.java.opts"
-    ]
+    "jobConfigKey" : "mapreduce.map.output.compress, mapreduce.map.output.compress.codec, mapreduce.output.fileoutputformat.compress, mapreduce.output.fileoutputformat.compress.type, mapreduce.output.fileoutputformat.compress.codec, mapred.output.format.class, eagle.job.runid, eagle.job.runidfieldname, eagle.job.name, eagle.job.normalizedfieldname, eagle.alert.email, eagle.job.alertemailaddress, dataplatform.etl.info, mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, mapreduce.map.java.opts, mapreduce.reduce.java.opts"
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-util/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/pom.xml b/eagle-jpm/eagle-jpm-util/pom.xml
index e424e49..0c6be01 100644
--- a/eagle-jpm/eagle-jpm-util/pom.xml
+++ b/eagle-jpm/eagle-jpm-util/pom.xml
@@ -68,5 +68,10 @@
             <artifactId>commons-codec</artifactId>
             <version>1.9</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
index b263c25..a7a396a 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
@@ -34,6 +34,9 @@ public class HDFSUtil {
 
     public static void login(Configuration kConfig) throws IOException {
         if (kConfig.get("hdfs.kerberos.principal") == null || kConfig.get("hdfs.kerberos.principal").isEmpty()) {
+            if (kConfig.get("hadoop.job.ugi") != null) {
+                System.setProperty("HADOOP_USER_NAME", kConfig.get("hadoop.job.ugi"));
+            }
             return;
         }
         kConfig.setBoolean("hadoop.security.authorization", true);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/MrJobs.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/MrJobs.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/MrJobs.java
index 1d4871c..ff2ab6d 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/MrJobs.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/MrJobs.java
@@ -31,7 +31,7 @@ public class MrJobs {
         return job;
     }
 
-    public void setJobs(List<MRJob> job) {
+    public void setJob(List<MRJob> job) {
         this.job = job;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 7e8aabb..2cbba46 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -34,6 +34,21 @@
             <artifactId>eagle-server</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-mr-history</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-mr-running</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-web</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
index 2701c42..989886f 100644
--- a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
+++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -16,3 +16,5 @@
 org.apache.eagle.security.hbase.HBaseAuditLogAppProvider
 org.apache.eagle.app.example.ExampleApplicationProvider
 org.apache.eagle.app.jpm.JPMWebApplicationProvider
+org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider
+org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/45ff4b8f/eagle-topology-assembly/src/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..21686a6
--- /dev/null
+++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.hdfs.DistributedFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
+org.apache.hadoop.hdfs.web.WebHdfsFileSystem
+org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
\ No newline at end of file