You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/20 10:55:02 UTC

incubator-eagle git commit: [EAGLE-653] remove zk root from configure

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 605c8b21a -> d76fa311b


[EAGLE-653] remove zk root from configure

Author: wujinhu <wu...@126.com>

Closes #542 from wujinhu/EAGLE-653.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/d76fa311
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/d76fa311
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/d76fa311

Branch: refs/heads/master
Commit: d76fa311b2f8c5e3a51ef12700807f01c8f2e98c
Parents: 605c8b2
Author: wujinhu <wu...@126.com>
Authored: Thu Oct 20 18:54:51 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Thu Oct 20 18:54:51 2016 +0800

----------------------------------------------------------------------
 .../jpm/aggregation/AggregationConfig.java      |  4 +-
 ...gregation.AggregationApplicationProvider.xml |  8 ----
 .../MRHistoryJobApplicationListener.java        | 46 ++++++++++++++++++
 .../MRHistoryJobApplicationProvider.java        |  8 ++++
 .../jpm/mr/history/MRHistoryJobConfig.java      |  4 +-
 ....history.MRHistoryJobApplicationProvider.xml | 33 +++++--------
 .../MRRunningJobApplicationListener.java        | 49 ++++++++++++++++++++
 .../MRRunningJobApplicationProvider.java        |  8 ++++
 .../jpm/mr/running/MRRunningJobConfig.java      |  4 +-
 .../jpm/mr/running/parser/MRJobParser.java      | 45 +++++++-----------
 ....running.MRRunningJobApplicationProvider.xml |  9 ----
 11 files changed, 149 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
index c246885..e17c5b6 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/AggregationConfig.java
@@ -26,6 +26,8 @@ import java.io.Serializable;
 public class AggregationConfig implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(AggregationConfig.class);
 
+    private static final String ZK_ROOT_PREFIX = "/apps/mr/aggregation";
+
     public ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
@@ -105,7 +107,7 @@ public class AggregationConfig implements Serializable {
         this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
         this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
         this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
-        this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
+        this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + this.stormConfig.site;
 
         // parse eagle service endpoint
         this.eagleServiceConfig.eagleServiceHost = config.getString("service.host");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
index 956f134..29ca98a 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/META-INF/providers/org.apache.eagle.jpm.aggregation.AggregationApplicationProvider.xml
@@ -52,14 +52,6 @@
             <description>seconds, each bolt process metrics from [start, start + aggregationDuration]</description>
             <value>3600</value>
         </property>
-
-        <property>
-            <name>zookeeper.zkRoot</name>
-            <value>/aggregation/mr/sandbox</value>
-            <displayName>Zookeeper Root</displayName>
-            <description>zkRoot that used to save context for this application</description>
-            <required>true</required>
-        </property>
         <property>
             <name>aggregate.counters.metrics</name>
             <value>cpu_milliseconds, file_bytes_read, file_bytes_written, hdfs_bytes_read, hdfs_bytes_written, hdfs_read_ops, hdfs_write_ops</value>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationListener.java
new file mode 100644
index 0000000..4bfe0a4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationListener.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.history;
+
+import org.apache.eagle.app.service.ApplicationListener;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+
+public class MRHistoryJobApplicationListener implements ApplicationListener {
+    private ApplicationEntity applicationEntity;
+
+    @Override
+    public void init(ApplicationEntity applicationEntity) {
+        this.applicationEntity = applicationEntity;
+    }
+
+    @Override
+    public void afterInstall() {
+    }
+
+    @Override
+    public void afterUninstall() {
+    }
+
+    @Override
+    public void beforeStart() {
+    }
+
+    @Override
+    public void afterStop() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
index 9aa1c61..774aa8a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
@@ -16,11 +16,19 @@
  */
 package org.apache.eagle.jpm.mr.history;
 
+import org.apache.eagle.app.service.ApplicationListener;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
+import java.util.Optional;
+
 public class MRHistoryJobApplicationProvider extends AbstractApplicationProvider<MRHistoryJobApplication> {
     @Override
     public MRHistoryJobApplication getApplication() {
         return new MRHistoryJobApplication();
     }
+
+    @Override
+    public Optional<ApplicationListener> getApplicationListener() {
+        return Optional.of(new MRHistoryJobApplicationListener());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
index 566f7d0..496aa77 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -35,7 +35,7 @@ import java.util.Map;
 public class MRHistoryJobConfig implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(MRHistoryJobConfig.class);
 
-    private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf";
+    private static final String ZK_ROOT_PREFIX = "/apps/mr/history";
 
     public ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
@@ -123,7 +123,7 @@ public class MRHistoryJobConfig implements Serializable {
         this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
         this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
         this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
-        this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
+        this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + config.getString("siteId");
 
         //parse job history endpoint
         this.jobHistoryEndpointConfig.site = config.getString("siteId");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index 6f44b3d..7df445f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -34,20 +34,18 @@
             <description>the number tasks of the spout will be assigned</description>
             <value>4</value>
         </property>
-
         <property>
-            <name>zookeeper.zkRoot</name>
-            <displayName>Zookeeper Root Path</displayName>
-            <description>zkRoot that used to save context for this application</description>
-            <value>/mrjobhistory_sandbox</value>
+            <name>endpointConfig.hdfs.fs.defaultFS</name>
+            <displayName>HDFS Address</displayName>
+            <description>The name of the default file system.  Either the literal string "local" or a host:port for NDFS</description>
+            <value>hdfs://sandbox.hortonworks.com:8020</value>
             <required>true</required>
         </property>
-
         <property>
-            <name>endpointConfig.timeZone</name>
-            <displayName>Time Zone</displayName>
-            <description>which time zone do hdfs data nodes locate in</description>
-            <value>Etc/GMT+7</value>
+            <name>endpointConfig.basePath</name>
+            <displayName>Map Reduce History Log File Path</displayName>
+            <description>which directory do map reduce history job files locate in</description>
+            <value>/mapred/history/done</value>
             <required>true</required>
         </property>
         <property>
@@ -58,17 +56,10 @@
             <required>true</required>
         </property>
         <property>
-            <name>endpointConfig.basePath</name>
-            <displayName>Map Reduce History Log File Path</displayName>
-            <description>which directory do map reduce history job files locate in</description>
-            <value>/mr-history/done</value>
-            <required>true</required>
-        </property>
-        <property>
-            <name>endpointConfig.hdfs.fs.defaultFS</name>
-            <displayName>HDFS Address</displayName>
-            <description>The name of the default file system.  Either the literal string "local" or a host:port for NDFS</description>
-            <value>hdfs://sandbox.hortonworks.com:8020</value>
+            <name>endpointConfig.timeZone</name>
+            <displayName>Time Zone</displayName>
+            <description>which time zone do hdfs data nodes locate in</description>
+            <value>Etc/GMT+7</value>
             <required>true</required>
         </property>
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationListener.java
new file mode 100644
index 0000000..d9b4dc9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationListener.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.running;
+
+import org.apache.eagle.app.service.ApplicationListener;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+
+public class MRRunningJobApplicationListener implements ApplicationListener {
+    private ApplicationEntity applicationEntity;
+
+    @Override
+    public void init(ApplicationEntity applicationEntity) {
+        this.applicationEntity = applicationEntity;
+    }
+
+    @Override
+    public void afterInstall() {
+    }
+
+    @Override
+    public void afterUninstall() {
+    }
+
+    @Override
+    public void beforeStart() {
+    }
+
+    @Override
+    public void afterStop() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
index 45a841b..5a57aca 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
@@ -16,11 +16,19 @@
  */
 package org.apache.eagle.jpm.mr.running;
 
+import org.apache.eagle.app.service.ApplicationListener;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
+import java.util.Optional;
+
 public class MRRunningJobApplicationProvider extends AbstractApplicationProvider<MRRunningJobApplication> {
     @Override
     public MRRunningJobApplication getApplication() {
         return new MRRunningJobApplication();
     }
+
+    @Override
+    public Optional<ApplicationListener> getApplicationListener() {
+        return Optional.of(new MRRunningJobApplicationListener());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index 2fe91d2..10d8f76 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -29,6 +29,8 @@ import java.io.Serializable;
 public class MRRunningJobConfig implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobConfig.class);
 
+    private static final String ZK_ROOT_PREFIX = "/apps/mr/running";
+
     public ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
@@ -107,7 +109,7 @@ public class MRRunningJobConfig implements Serializable {
         this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
         this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
         this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
-        this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
+        this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + config.getString("siteId");
 
         // parse eagle service endpoint
         this.eagleServiceConfig.eagleServiceHost = config.getString("service.host");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index edcb9f7..0abf9a0 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -76,8 +76,7 @@ public class MRJobParser implements Runnable {
     private ResourceFetcher rmResourceFetcher;
     private Set<String> finishedTaskIds;
     private List<String> configKeys;
-    private MRRunningJobConfig.EndpointConfig endpointConfig;
-    private static final int TOP_BOTTOM_TASKS_BY_ELASPED_TIME = 10;
+    private static final int TOP_BOTTOM_TASKS_BY_ELAPSED_TIME = 10;
     private static final int FLUSH_TASKS_EVERY_TIME = 5;
 
     static {
@@ -89,7 +88,6 @@ public class MRJobParser implements Runnable {
                        AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
                        MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher,
                        List<String> configKeys) {
-        this.endpointConfig = endpointConfig;
         this.app = app;
         this.mrJobEntityMap = new HashMap<>();
         this.mrJobEntityMap = mrJobMap;
@@ -392,6 +390,17 @@ public class MRJobParser implements Runnable {
         return null;
     };
 
+    private void needFetchAttemptTasks(Iterator<MRTask> taskIterator, Set<String> needFetchAttemptTasks) {
+        int i = 0;
+        while (taskIterator.hasNext() && i < TOP_BOTTOM_TASKS_BY_ELAPSED_TIME) {
+            MRTask mrTask = taskIterator.next();
+            if (mrTask.getElapsedTime() > 0) {
+                i++;
+                needFetchAttemptTasks.add(mrTask.getId());
+            }
+        }
+    }
+
     private Set<String> calcFetchCounterAndAttemptTaskId(List<MRTask> tasks) {
         Set<String> needFetchAttemptTasks = new HashSet<>();
         //1, sort by elapsedTime
@@ -401,38 +410,20 @@ public class MRJobParser implements Runnable {
         Iterator<MRTask> taskIteratorIncrease = tasks.stream()
             .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
             .sorted(byElapsedTimeIncrease).iterator();
-        int i = 0;
-        while (taskIteratorIncrease.hasNext() && i < TOP_BOTTOM_TASKS_BY_ELASPED_TIME) {
-            MRTask mrTask = taskIteratorIncrease.next();
-            if (mrTask.getElapsedTime() > 0) {
-                i++;
-                needFetchAttemptTasks.add(mrTask.getId());
-            }
-        }
+        needFetchAttemptTasks(taskIteratorIncrease, needFetchAttemptTasks);
+
         //3, fetch finished top n
         Iterator<MRTask> taskIteratorDecrease = tasks.stream()
             .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
             .sorted(byElapsedTimeDecrease).iterator();
-        i = 0;
-        while (taskIteratorDecrease.hasNext() && i < TOP_BOTTOM_TASKS_BY_ELASPED_TIME) {
-            MRTask mrTask = taskIteratorDecrease.next();
-            if (mrTask.getElapsedTime() > 0) {
-                i++;
-                needFetchAttemptTasks.add(mrTask.getId());
-            }
-        }
+        needFetchAttemptTasks(taskIteratorDecrease, needFetchAttemptTasks);
+
         //4, fetch running top n
         taskIteratorDecrease = tasks.stream()
             .filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
             .sorted(byElapsedTimeDecrease).iterator();
-        i = 0;
-        while (taskIteratorDecrease.hasNext() && i < TOP_BOTTOM_TASKS_BY_ELASPED_TIME) {
-            MRTask mrTask = taskIteratorDecrease.next();
-            if (mrTask.getElapsedTime() > 0) {
-                i++;
-                needFetchAttemptTasks.add(mrTask.getId());
-            }
-        }
+        needFetchAttemptTasks(taskIteratorDecrease, needFetchAttemptTasks);
+
         return needFetchAttemptTasks;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d76fa311/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
index 2e91468..caa6f3e 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
@@ -39,15 +39,6 @@
             <description>number of tasks to parse map reduce running jobs got from resource manager</description>
             <value>5</value>
         </property>
-
-        <property>
-            <name>zookeeper.zkRoot</name>
-            <displayName>Zookeeper Root Path</displayName>
-            <description>zkRoot that used to save context for this application</description>
-            <value>/apps/mr/runningSandbox</value>
-            <required>true</required>
-        </property>
-
         <property>
             <name>endpointConfig.fetchRunningJobInterval</name>
             <displayName>Interval of Fetch Running Job From Resource Manager</displayName>