You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:14 UTC

[18/52] [abbrv] incubator-eagle git commit: [EAGLE-501] add fields and fix bugs

[EAGLE-501] add fields and fix bugs

Author: wujinhu <wu...@126.com>

Closes #388 from wujinhu/EAGLE-501.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7f372671
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7f372671
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7f372671

Branch: refs/heads/master
Commit: 7f37267160af48b5cb46b4bcbd85a3a82957b192
Parents: a846c40
Author: wujinhu <wu...@126.com>
Authored: Thu Aug 25 23:46:52 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Aug 25 23:46:52 2016 +0800

----------------------------------------------------------------------
 .../environment/impl/StormExecutionRuntime.java |  6 ++-
 .../mr/historyentity/JobExecutionAPIEntity.java | 44 ++++++++++++++++
 .../TaskAttemptExecutionAPIEntity.java          |  2 +-
 .../mr/runningentity/JobExecutionAPIEntity.java | 11 ++++
 eagle-jpm/eagle-jpm-mr-history/pom.xml          | 11 ++--
 .../jpm/mr/history/parser/EagleJobStatus.java   |  1 +
 .../mr/history/parser/JHFEventReaderBase.java   | 27 +++++++++-
 .../src/main/resources/application.conf         | 11 ++--
 .../jpm/mr/running/parser/MRJobParser.java      | 12 +++--
 .../mr/running/storm/MRRunningJobParseBolt.java |  1 +
 .../src/main/resources/application.conf         | 55 +++++++++++---------
 11 files changed, 141 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 83d3592..1b989ac 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -60,6 +60,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
     private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
     private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
     private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
+    private static final String WORKERS = "workers";
 
     public backtype.storm.Config getStormConfig(){
         backtype.storm.Config conf = new backtype.storm.Config();
@@ -86,6 +87,9 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
         conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
         conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "backtype.storm.security.auth.SimpleTransportPlugin");
+        if (environment.config().hasPath(WORKERS)) {
+            conf.setNumWorkers(environment.config().getInt(WORKERS));
+        }
         return conf;
     }
 
@@ -96,7 +100,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         StormTopology topology = executor.execute(config, environment);
         LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
         Config conf = getStormConfig();
-        if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+        if(config.getString("mode").equals(ApplicationEntity.Mode.CLUSTER.name())){
             String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
             if(jarFile == null){
                 jarFile = DynamicJarPathFinder.findPath(executor.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
index 1f75f07..97e77b2 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
@@ -81,6 +81,14 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
     private double maxMapTaskDuration;
     @Column("x")
     private double maxReduceTaskDuration;
+    @Column("y")
+    private int totalMapAttempts;
+    @Column("z")
+    private int failedMapAttempts;
+    @Column("ab")
+    private int totalReduceAttempts;
+    @Column("ac")
+    private int failedReduceAttempts;
 
     public long getDurationTime() {
         return durationTime;
@@ -288,4 +296,40 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
         this.maxReduceTaskDuration = maxReduceTaskDuration;
         valueChanged("maxReduceTaskDuration");
     }
+
+    public int getTotalMapAttempts() {
+        return totalMapAttempts;
+    }
+
+    public void setTotalMapAttempts(int totalMapAttempts) {
+        this.totalMapAttempts = totalMapAttempts;
+        valueChanged("totalMapAttempts");
+    }
+
+    public int getFailedMapAttempts() {
+        return failedMapAttempts;
+    }
+
+    public void setFailedMapAttempts(int failedMapAttempts) {
+        this.failedMapAttempts = failedMapAttempts;
+        valueChanged("failedMapAttempts");
+    }
+
+    public int getTotalReduceAttempts() {
+        return totalReduceAttempts;
+    }
+
+    public void setTotalReduceAttempts(int totalReduceAttempts) {
+        this.totalReduceAttempts = totalReduceAttempts;
+        valueChanged("totalReduceAttempts");
+    }
+
+    public int getFailedReduceAttempts() {
+        return failedReduceAttempts;
+    }
+
+    public void setFailedReduceAttempts(int failedReduceAttempts) {
+        this.failedReduceAttempts = failedReduceAttempts;
+        valueChanged("failedReduceAttempts");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
index 620ee1f..fd96828 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
@@ -31,7 +31,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 @TimeSeries(true)
 @Partition({"site"})
 @Indexes({
-    @Index(name = "Index_1_jobId", columns = { "jobID" }, unique = false)
+    @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = false)
     })
 public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
     @Column("a")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
index 86b6554..dd81eb4 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
@@ -110,6 +110,8 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
     private int totalLaunchedMaps;
     @Column("aj")
     private long submissionTime;
+    @Column("ak")
+    private String internalState;
 
     public JobConfig getJobConfig() {
         return jobConfig;
@@ -434,4 +436,13 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
         this.submissionTime = submissionTime;
         valueChanged("submissionTime");
     }
+
+    public String getInternalState() {
+        return internalState;
+    }
+
+    public void setInternalState(String internalState) {
+        this.internalState = internalState;
+        valueChanged("internalState");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index 1ffda6a..5732237 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -56,6 +56,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-app</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-data-process</artifactId>
             <version>${project.version}</version>
             <exclusions>
@@ -104,13 +109,7 @@
             <artifactId>hadoop-mapreduce-client-core</artifactId>
             <version>${hadoop.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-app-base</artifactId>
-            <version>${project.version}</version>
-        </dependency>
     </dependencies>
-
     <build>
         <resources>
             <resource>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
index 0a137be..fb218e3 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
@@ -24,5 +24,6 @@ public enum EagleJobStatus {
     PREP,
     RUNNING,
     SUCCESS,
+    KILLED,
     FAILED;
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 5d3d5b4..6916aad 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -111,6 +111,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
 
         jobExecutionEntity = new JobExecutionAPIEntity();
         jobExecutionEntity.setTags(new HashMap<>(baseTags));
+        jobExecutionEntity.setNumFailedMaps(0);
+        jobExecutionEntity.setNumFailedReduces(0);
 
         taskRunningHosts = new HashMap<>();
 
@@ -350,7 +352,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             if (values.get(Keys.COUNTERS) != null || counters != null) {
                 entity.setJobCounters(parseCounters(counters));
             }
-            long duration = entity.getEndTime() - jobSubmitEventEntity.getTimestamp();
+            long duration = entity.getEndTime() - jobLaunchTime;
             if (taskType.equals(Constants.TaskType.MAP.toString()) && duration > jobExecutionEntity.getLastMapDuration()) {
                 jobExecutionEntity.setLastMapDuration(duration);
             }
@@ -367,8 +369,16 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
 
             if (taskType.equals(Constants.TaskType.MAP.toString())) {
                 this.sumMapTaskDuration += entity.getDuration();
+                if (entity.getTaskStatus().equals(EagleTaskStatus.FAILED.name())
+                    || entity.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+                    jobExecutionEntity.setNumFailedMaps(1 + jobExecutionEntity.getNumFailedMaps());
+                }
             } else {
                 this.sumReduceTaskDuration += entity.getDuration();
+                if (entity.getTaskStatus().equals(EagleTaskStatus.FAILED.name())
+                    || entity.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+                    jobExecutionEntity.setNumFailedReduces(1 + jobExecutionEntity.getNumFailedReduces());
+                }
             }
 
             entityCreated(entity);
@@ -402,6 +412,21 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
                 entity.setJobCounters(parseCounters(counters));
             }
             entity.setTaskAttemptID(taskAttemptID);
+
+            if (recType == RecordTypes.MapAttempt) {
+                jobExecutionEntity.setTotalMapAttempts(1 + jobExecutionEntity.getTotalMapAttempts());
+                if (entity.getTaskStatus().equals(EagleTaskStatus.FAILED.name())
+                    || entity.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+                    jobExecutionEntity.setFailedMapAttempts(1 + jobExecutionEntity.getFailedMapAttempts());
+                }
+            } else {
+                jobExecutionEntity.setTotalReduceAttempts(1 + jobExecutionEntity.getTotalReduceAttempts());
+                if (entity.getTaskStatus().equals(EagleTaskStatus.FAILED.name())
+                    || entity.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+                    jobExecutionEntity.setFailedReduceAttempts(1 + jobExecutionEntity.getFailedReduceAttempts());
+                }
+            }
+
             entityCreated(entity);
             taskAttemptStartTime.remove(taskAttemptID);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 13e411f..db2c716 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -15,13 +15,15 @@
 
 {
   "envContextConfig" : {
+    "env" : "local",
+    "topologyName" : "mrHistoryJob",
+    "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
       "mrHistoryJobExecutor" : 6
     },
     "tasks" : {
       "mrHistoryJobExecutor" : 6
-    },
-    "workers" : 3
+    }
   },
 
   "jobExtractorConfig" : {
@@ -59,8 +61,11 @@
       "password": "secret"
     }
   },
-  "appId":"mr_history",
+
+  "appId":"mrHistoryJob",
   "mode":"LOCAL",
+  "workers" : 3,
+  application.storm.nimbusHost=localhost
   "MRConfigureKeys" : {
     "jobNameKey" : "eagle.job.name",
     "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 9148c0c..9e156fa 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -110,6 +110,10 @@ public class MRJobParser implements Runnable {
         this.configKeys = configKeys;
     }
 
+    public void setAppInfo(AppInfo app) {
+        this.app = app;
+    }
+
     public ParserStatus status() {
         return this.parserStatus;
     }
@@ -120,7 +124,8 @@ public class MRJobParser implements Runnable {
 
     private void finishMRJob(String mrJobId) {
         JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(mrJobId);
-        jobExecutionAPIEntity.setCurrentState(Constants.AppState.FINISHED.toString());
+        jobExecutionAPIEntity.setInternalState(Constants.AppState.FINISHED.toString());
+        jobExecutionAPIEntity.setCurrentState(Constants.AppState.RUNNING.toString());
         mrJobConfigs.remove(mrJobId);
         if (mrJobConfigs.size() == 0) {
             this.parserStatus = ParserStatus.APP_FINISHED;
@@ -205,6 +210,7 @@ public class MRJobParser implements Runnable {
             jobExecutionAPIEntity.setStartTime(mrJob.getStartTime());
             jobExecutionAPIEntity.setDurationTime(mrJob.getElapsedTime());
             jobExecutionAPIEntity.setCurrentState(mrJob.getState());
+            jobExecutionAPIEntity.setInternalState(mrJob.getState());
             jobExecutionAPIEntity.setNumTotalMaps(mrJob.getMapsTotal());
             jobExecutionAPIEntity.setMapsCompleted(mrJob.getMapsCompleted());
             jobExecutionAPIEntity.setNumTotalReduces(mrJob.getReducesTotal());
@@ -562,8 +568,8 @@ public class MRJobParser implements Runnable {
                     mrJobEntityMap.keySet()
                         .stream()
                         .filter(
-                            jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString())
-                                || mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString()))
+                            jobId -> mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FINISHED.toString())
+                                || mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FAILED.toString()))
                         .forEach(
                             jobId -> this.runningJobManager.delete(app.getId(), jobId));
                 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 3174eb1..e918597 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -88,6 +88,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
             LOG.info("create application parser for {}", appInfo.getId());
         } else {
             applicationParser = runningMRParsers.get(appInfo.getId());
+            applicationParser.setAppInfo(appInfo);
         }
 
         Set<String> runningParserIds = new HashSet<>(runningMRParsers.keySet());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
index 4b6d4fe..0d1de78 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
@@ -16,24 +16,27 @@
 {
   "appId":"mrRunningJob",
   "mode":"LOCAL",
+  application.storm.nimbusHost=localhost,
+  "workers" : 8,
   "envContextConfig" : {
     "env" : "local",
+    "topologyName" : "mrRunningJob",
+    "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
       "mrRunningJobFetchSpout" : 1,
-      "mrRunningJobParseBolt" : 10
+      "mrRunningJobParseBolt" : 16
     },
     "tasks" : {
       "mrRunningJobFetchSpout" : 1,
-      "mrRunningJobParseBolt" : 10
-    },
-    "workers" : 5
+      "mrRunningJobParseBolt" : 16
+    }
   },
 
   "jobExtractorConfig" : {
     "site" : "sandbox",
     "fetchRunningJobInterval" : 60,
-    "parseJobThreadPoolSize" : 5, #job concurrent
-    "topAndBottomTaskByElapsedTime" : 50
+    "parseJobThreadPoolSize" : 6,
+    "topAndBottomTaskByElapsedTime" : 10
   },
 
   "zookeeperConfig" : {
@@ -44,9 +47,11 @@
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000
   },
+
   "dataSourceConfig" : {
     "rmUrls": "http://sandbox.hortonworks.com:50030"
   },
+
   "eagleProps" : {
     "mailHost" : "abc.com",
     "mailDebug" : "true",
@@ -63,23 +68,23 @@
   "MRConfigureKeys" : {
     "jobNameKey" : "eagle.job.name",
     "jobConfigKey" : [
-    "mapreduce.map.output.compress",
-    "mapreduce.map.output.compress.codec",
-    "mapreduce.output.fileoutputformat.compress",
-    "mapreduce.output.fileoutputformat.compress.type",
-    "mapreduce.output.fileoutputformat.compress.codec",
-    "mapred.output.format.class",
-    "eagle.job.runid",
-    "eagle.job.runidfieldname",
-    "eagle.job.name",
-    "eagle.job.normalizedfieldname",
-    "eagle.alert.email",
-    "eagle.job.alertemailaddress",
-    "dataplatform.etl.info",
-    "mapreduce.map.memory.mb",
-    "mapreduce.reduce.memory.mb",
-    "mapreduce.map.java.opts",
-    "mapreduce.reduce.java.opts"
-  ]
+      "mapreduce.map.output.compress",
+      "mapreduce.map.output.compress.codec",
+      "mapreduce.output.fileoutputformat.compress",
+      "mapreduce.output.fileoutputformat.compress.type",
+      "mapreduce.output.fileoutputformat.compress.codec",
+      "mapred.output.format.class",
+      "eagle.job.runid",
+      "eagle.job.runidfieldname",
+      "eagle.job.name",
+      "eagle.job.normalizedfieldname",
+      "eagle.alert.email",
+      "eagle.job.alertemailaddress",
+      "dataplatform.etl.info",
+      "mapreduce.map.memory.mb",
+      "mapreduce.reduce.memory.mb",
+      "mapreduce.map.java.opts",
+      "mapreduce.reduce.java.opts"
+    ]
   }
-}
+}
\ No newline at end of file