You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/19 04:05:18 UTC
incubator-eagle git commit: [EAGLE-634] clean up configuration for MR
running feeder
Repository: incubator-eagle
Updated Branches:
refs/heads/master 6dbdb4f72 -> 71a4bb013
[EAGLE-634] clean up configuration for MR running feeder
Author: wujinhu <wu...@126.com>
Closes #530 from wujinhu/EAGLE-634.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/71a4bb01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/71a4bb01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/71a4bb01
Branch: refs/heads/master
Commit: 71a4bb013c2acabb54e03d8988f5b07c4923384c
Parents: 6dbdb4f
Author: wujinhu <wu...@126.com>
Authored: Wed Oct 19 12:05:09 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Wed Oct 19 12:05:09 2016 +0800
----------------------------------------------------------------------
eagle-jpm/eagle-jpm-mr-running/pom.xml | 15 ---
.../jpm/mr/running/MRRunningJobApplication.java | 26 ++--
.../jpm/mr/running/MRRunningJobConfig.java | 51 +++-----
.../parser/MRJobEntityCreationHandler.java | 3 +-
.../jpm/mr/running/parser/MRJobParser.java | 15 +--
.../running/storm/MRRunningJobFetchSpout.java | 7 +-
.../mr/running/storm/MRRunningJobParseBolt.java | 7 +-
....running.MRRunningJobApplicationProvider.xml | 122 +++++--------------
.../src/main/resources/application.conf | 47 +++----
9 files changed, 88 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml
index aff1a90..34dcedc 100644
--- a/eagle-jpm/eagle-jpm-mr-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -37,21 +37,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-data-process</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.wso2.orbit.com.lmax</groupId>
- <artifactId>disruptor</artifactId>
- </exclusion>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index e8abf30..16e8ea7 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -27,7 +27,6 @@ import backtype.storm.tuple.Fields;
import com.typesafe.config.Config;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
public class MRRunningJobApplication extends StormApplication {
@@ -51,33 +50,24 @@ public class MRRunningJobApplication extends StormApplication {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spoutName = "mrRunningJobFetchSpout";
String boltName = "mrRunningJobParseBolt";
- int parallelism = mrRunningJobConfig.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
- int tasks = mrRunningJobConfig.getConfig().getInt("envContextConfig.tasks." + spoutName);
- if (parallelism > tasks) {
- parallelism = tasks;
- }
+ int tasks = mrRunningJobConfig.getConfig().getInt("stormConfig." + spoutName + "Tasks");
+
topologyBuilder.setSpout(
spoutName,
- new MRRunningJobFetchSpout(
- mrRunningJobConfig.getJobExtractorConfig(),
- mrRunningJobConfig.getEndpointConfig(),
- mrRunningJobConfig.getZkStateConfig()),
- parallelism
+ new MRRunningJobFetchSpout(mrRunningJobConfig.getEndpointConfig(),
+ mrRunningJobConfig.getZkStateConfig()),
+ tasks
).setNumTasks(tasks);
- parallelism = mrRunningJobConfig.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
- tasks = mrRunningJobConfig.getConfig().getInt("envContextConfig.tasks." + boltName);
- if (parallelism > tasks) {
- parallelism = tasks;
- }
+ tasks = mrRunningJobConfig.getConfig().getInt("stormConfig." + boltName + "Tasks");
+
topologyBuilder.setBolt(boltName,
new MRRunningJobParseBolt(
mrRunningJobConfig.getEagleServiceConfig(),
mrRunningJobConfig.getEndpointConfig(),
- mrRunningJobConfig.getJobExtractorConfig(),
mrRunningJobConfig.getZkStateConfig(),
confKeyKeys),
- parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+ tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
return topologyBuilder.createTopology();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index 93bcd0c..2fe91d2 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -41,12 +41,6 @@ public class MRRunningJobConfig implements Serializable {
private EagleServiceConfig eagleServiceConfig;
- public JobExtractorConfig getJobExtractorConfig() {
- return jobExtractorConfig;
- }
-
- private JobExtractorConfig jobExtractorConfig;
-
public EndpointConfig getEndpointConfig() {
return endpointConfig;
}
@@ -59,27 +53,21 @@ public class MRRunningJobConfig implements Serializable {
public int zkSessionTimeoutMs;
public int zkRetryTimes;
public int zkRetryInterval;
- public String zkPort;
}
public static class EagleServiceConfig implements Serializable {
public String eagleServiceHost;
public int eagleServicePort;
public int readTimeoutSeconds;
- public int maxFlushNum;
public String username;
public String password;
}
- public static class JobExtractorConfig implements Serializable {
+ public static class EndpointConfig implements Serializable {
public String site;
+ public String[] rmUrls;
public int fetchRunningJobInterval;
public int parseJobThreadPoolSize;
- public int topAndBottomTaskByElapsedTime;
- }
-
- public static class EndpointConfig implements Serializable {
- public String[] rmUrls;
}
public Config getConfig() {
@@ -92,7 +80,6 @@ public class MRRunningJobConfig implements Serializable {
private MRRunningJobConfig() {
this.eagleServiceConfig = new EagleServiceConfig();
- this.jobExtractorConfig = new JobExtractorConfig();
this.endpointConfig = new EndpointConfig();
this.zkStateConfig = new ZKStateConfig();
}
@@ -116,32 +103,28 @@ public class MRRunningJobConfig implements Serializable {
this.config = config;
//parse eagle zk
- this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
- this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
- this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
- this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
- this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
- this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+ this.zkStateConfig.zkQuorum = config.getString("zookeeper.zkQuorum");
+ this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
+ this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
+ this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
+ this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
// parse eagle service endpoint
- this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
- String port = config.getString("eagleProps.eagleService.port");
+ this.eagleServiceConfig.eagleServiceHost = config.getString("service.host");
+ String port = config.getString("service.port");
this.eagleServiceConfig.eagleServicePort = Integer.parseInt(port);
- this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
- this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
- this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
- this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
- //parse job extractor
- this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
- this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
- this.jobExtractorConfig.parseJobThreadPoolSize = config.getInt("jobExtractorConfig.parseJobThreadPoolSize");
- this.jobExtractorConfig.topAndBottomTaskByElapsedTime = config.getInt("jobExtractorConfig.topAndBottomTaskByElapsedTime");
+ this.eagleServiceConfig.username = config.getString("service.username");
+ this.eagleServiceConfig.password = config.getString("service.password");
+ this.eagleServiceConfig.readTimeoutSeconds = config.getInt("service.readTimeOutSeconds");
//parse data source config
- this.endpointConfig.rmUrls = config.getString("dataSourceConfig.rmUrls").split(",");
+ this.endpointConfig.rmUrls = config.getString("endpointConfig.rmUrls").split(",");
+ this.endpointConfig.site = config.getString("siteId");
+ this.endpointConfig.fetchRunningJobInterval = config.getInt("endpointConfig.fetchRunningJobInterval");
+ this.endpointConfig.parseJobThreadPoolSize = config.getInt("endpointConfig.parseJobThreadPoolSize");
LOG.info("Successfully initialized MRRunningJobConfig");
- LOG.info("site: " + this.jobExtractorConfig.site);
+ LOG.info("site: " + this.endpointConfig.site);
LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index 1a0fb61..ad80fd6 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -40,6 +40,7 @@ public class MRJobEntityCreationHandler {
private MRRunningJobConfig.EagleServiceConfig eagleServiceConfig;
private JobExecutionMetricsCreationListener jobMetricsListener;
private TaskExecutionMetricsCreationListener taskMetricsListener;
+ private static final int MAX_FLUSH_NUM = 1000;
public MRJobEntityCreationHandler(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig) {
this.eagleServiceConfig = eagleServiceConfig;
@@ -61,7 +62,7 @@ public class MRJobEntityCreationHandler {
metricEntities = jobMetricsListener.generateMetrics((JobExecutionAPIEntity) entity);
entities.addAll(metricEntities);
}
- if (entities.size() >= eagleServiceConfig.maxFlushNum) {
+ if (entities.size() >= MAX_FLUSH_NUM) {
this.flush();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index e5e3444..797bf21 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -77,18 +77,19 @@ public class MRJobParser implements Runnable {
private boolean first;
private Set<String> finishedTaskIds;
private List<String> configKeys;
- private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
+ private MRRunningJobConfig.EndpointConfig endpointConfig;
+ private static final int TOP_BOTTOM_TASKS_BY_ELASPED_TIME = 10;
static {
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
}
- public MRJobParser(MRRunningJobConfig.JobExtractorConfig jobExtractorConfig,
+ public MRJobParser(MRRunningJobConfig.EndpointConfig endpointConfig,
MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher,
List<String> configKeys) {
- this.jobExtractorConfig = jobExtractorConfig;
+ this.endpointConfig = endpointConfig;
this.app = app;
this.mrJobEntityMap = new HashMap<>();
this.mrJobEntityMap = mrJobMap;
@@ -99,7 +100,7 @@ public class MRJobParser implements Runnable {
this.mrJobEntityCreationHandler = new MRJobEntityCreationHandler(eagleServiceConfig);
- this.commonTags.put(MRJobTagName.SITE.toString(), jobExtractorConfig.site);
+ this.commonTags.put(MRJobTagName.SITE.toString(), endpointConfig.site);
this.commonTags.put(MRJobTagName.USER.toString(), app.getUser());
this.commonTags.put(MRJobTagName.JOB_QUEUE.toString(), app.getQueue());
this.runningJobManager = runningJobManager;
@@ -403,7 +404,7 @@ public class MRJobParser implements Runnable {
.filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
.sorted(byElapsedTimeIncrease).iterator();
int i = 0;
- while (taskIteratorIncrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
+ while (taskIteratorIncrease.hasNext() && i < TOP_BOTTOM_TASKS_BY_ELASPED_TIME) {
MRTask mrTask = taskIteratorIncrease.next();
if (mrTask.getElapsedTime() > 0) {
i++;
@@ -415,7 +416,7 @@ public class MRJobParser implements Runnable {
.filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
.sorted(byElapsedTimeDecrease).iterator();
i = 0;
- while (taskIteratorDecrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
+ while (taskIteratorDecrease.hasNext() && i < TOP_BOTTOM_TASKS_BY_ELASPED_TIME) {
MRTask mrTask = taskIteratorDecrease.next();
if (mrTask.getElapsedTime() > 0) {
i++;
@@ -427,7 +428,7 @@ public class MRJobParser implements Runnable {
.filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
.sorted(byElapsedTimeDecrease).iterator();
i = 0;
- while (taskIteratorDecrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
+ while (taskIteratorDecrease.hasNext() && i < TOP_BOTTOM_TASKS_BY_ELASPED_TIME) {
MRTask mrTask = taskIteratorDecrease.next();
if (mrTask.getElapsedTime() > 0) {
i++;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
index 27d1575..7c910e7 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -41,7 +41,6 @@ import java.util.*;
public class MRRunningJobFetchSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobFetchSpout.class);
- private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
private MRRunningJobConfig.EndpointConfig endpointConfig;
private MRRunningJobConfig.ZKStateConfig zkStateConfig;
private ResourceFetcher resourceFetcher;
@@ -50,10 +49,8 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
private transient MRRunningJobManager runningJobManager;
private Set<String> runningYarnApps;
- public MRRunningJobFetchSpout(MRRunningJobConfig.JobExtractorConfig jobExtractorConfig,
- MRRunningJobConfig.EndpointConfig endpointConfig,
+ public MRRunningJobFetchSpout(MRRunningJobConfig.EndpointConfig endpointConfig,
MRRunningJobConfig.ZKStateConfig zkStateConfig) {
- this.jobExtractorConfig = jobExtractorConfig;
this.endpointConfig = endpointConfig;
this.zkStateConfig = zkStateConfig;
this.init = false;
@@ -140,7 +137,7 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
} catch (Exception e) {
e.printStackTrace();
} finally {
- Utils.sleep(jobExtractorConfig.fetchRunningJobInterval);
+ Utils.sleep(endpointConfig.fetchRunningJobInterval);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 4e0cdbc..9ebc1a7 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -42,7 +42,6 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobParseBolt.class);
private MRRunningJobConfig.EndpointConfig endpointConfig;
- private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
private MRRunningJobConfig.ZKStateConfig zkStateConfig;
private ExecutorService executorService;
private Map<String, MRJobParser> runningMRParsers;
@@ -53,12 +52,10 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
MRRunningJobConfig.EndpointConfig endpointConfig,
- MRRunningJobConfig.JobExtractorConfig jobExtractorConfig,
MRRunningJobConfig.ZKStateConfig zkStateConfig,
List<String> configKeys) {
this.eagleServiceConfig = eagleServiceConfig;
this.endpointConfig = endpointConfig;
- this.jobExtractorConfig = jobExtractorConfig;
this.runningMRParsers = new HashMap<>();
this.zkStateConfig = zkStateConfig;
this.configKeys = configKeys;
@@ -66,7 +63,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- this.executorService = Executors.newFixedThreadPool(jobExtractorConfig.parseJobThreadPoolSize);
+ this.executorService = Executors.newFixedThreadPool(endpointConfig.parseJobThreadPoolSize);
this.runningJobManager = new MRRunningJobManager(zkStateConfig);
this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
@@ -81,7 +78,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
MRJobParser applicationParser;
if (!runningMRParsers.containsKey(appInfo.getId())) {
- applicationParser = new MRJobParser(jobExtractorConfig, eagleServiceConfig,
+ applicationParser = new MRJobParser(endpointConfig, eagleServiceConfig,
appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys);
runningMRParsers.put(appInfo.getId(), applicationParser);
LOG.info("create application parser for {}", appInfo.getId());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
index 4b95b36..4063b3a 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
@@ -18,122 +18,60 @@
<application>
<type>MR_RUNNING_JOB_APP</type>
- <name>MR Running Job Monitoring</name>
+ <name>Map Reduce Running Job Monitoring</name>
<version>0.5.0-incubating</version>
<configuration>
- <!-- org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig -->
- <property>
- <name>jobExtractorConfig.site</name>
- <displayName>Site ID</displayName>
- <value>sandbox</value>
- </property>
<property>
<name>workers</name>
- <displayName>storm worker number</displayName>
+ <displayName>Worker Number</displayName>
+ <description>the number of storm workers will be used</description>
<value>4</value>
</property>
<property>
- <name>envContextConfig.parallelismConfig.mrRunningJobFetchSpout</name>
- <value>1</value>
- </property>
- <property>
- <name>envContextConfig.parallelismConfig.mrRunningJobParseBolt</name>
- <value>8</value>
- </property>
- <property>
- <name>envContextConfig.tasks.mrRunningJobFetchSpout</name>
+ <name>stormConfig.mrRunningJobFetchSpoutTasks</name>
+ <displayName>Read Task Number</displayName>
+ <description>number of tasks to fetch map reduce running jobs from resource manager</description>
<value>1</value>
</property>
<property>
- <name>envContextConfig.tasks.mrRunningJobParseBolt</name>
- <value>8</value>
- </property>
- <property>
- <name>zookeeperConfig.zkQuorum</name>
- <displayName>zkQuorum</displayName>
- <description>Zookeeper Quorum</description>
- <value>sandbox.hortonworks.com:2181</value>
- </property>
- <property>
- <name>zookeeperConfig.zkPort</name>
- <displayName>zkPort</displayName>
- <description>Zookeeper Port</description>
- <value>2181</value>
- </property>
- <property>
- <name>zookeeperConfig.zkSessionTimeoutMs</name>
- <displayName>zkSessionTimeoutMs</displayName>
- <description>Zookeeper session timeoutMs</description>
- <value>15000</value>
- </property>
- <property>
- <name>zookeeperConfig.zkRetryTimes</name>
- <displayName>zkRetryTimes</displayName>
- <description>zookeeperConfig.zkRetryTimes</description>
- <value>3</value>
- </property>
- <property>
- <name>zookeeperConfig.zkRetryInterval</name>
- <displayName>zkRetryInterval</displayName>
- <description>zookeeperConfig.zkRetryInterval</description>
- <value>20000</value>
+ <name>stormConfig.mrRunningJobParseBoltTasks</name>
+ <displayName>Parse Task Number</displayName>
+ <description>number of tasks to parse map reduce running jobs got from resource manager</description>
+ <value>5</value>
</property>
+
<property>
- <name>zookeeperConfig.zkRoot</name>
+ <name>zookeeper.zkRoot</name>
+ <displayName>Zookeeper Root Path</displayName>
+ <description>zkRoot that used to save context for this application</description>
<value>/apps/mr/runningSandbox</value>
+ <required>true</required>
</property>
+
<property>
- <name>eagleProps.eagleService.host</name>
- <description>eagleProps.eagleService.host</description>
- <value>sandbox.hortonworks.com</value>
- </property>
- <property>
- <name>eagleProps.eagleService.port</name>
- <description>eagleProps.eagleService.port</description>
- <value>9099</value>
- </property>
- <property>
- <name>eagleProps.eagleService.username</name>
- <description>eagleProps.eagleService.username</description>
- <value>admin</value>
- </property>
- <property>
- <name>eagleProps.eagleService.password</name>
- <description>eagleProps.eagleService.password</description>
- <value>secret</value>
- </property>
- <property>
- <name>eagleProps.eagleService.readTimeOutSeconds</name>
- <description>eagleProps.eagleService.readTimeOutSeconds</description>
- <value>60</value>
- </property>
- <property>
- <name>eagleProps.eagleService.maxFlushNum</name>
- <description>eagleProps.eagleService.maxFlushNum</description>
- <value>1000</value>
- </property>
- <property>
- <name>jobExtractorConfig.fetchRunningJobInterval</name>
- <description>jobExtractorConfig.fetchRunningJobInterval</description>
+ <name>endpointConfig.fetchRunningJobInterval</name>
+ <displayName>Interval of Fetch Running Job From Resource Manager</displayName>
+ <description>interval of fetch map reduce running jobs from resource manager</description>
<value>60</value>
</property>
<property>
- <name>jobExtractorConfig.parseJobThreadPoolSize</name>
- <description>jobExtractorConfig.parseJobThreadPoolSize</description>
+ <name>endpointConfig.parseJobThreadPoolSize</name>
+ <displayName>Parse Job ThreadPool Size in Each Parse Task</displayName>
+ <description>parse job thread pool size in each parse task</description>
<value>5</value>
</property>
<property>
- <name>jobExtractorConfig.topAndBottomTaskByElapsedTime</name>
- <description>jobExtractorConfig.topAndBottomTaskByElapsedTime</description>
- <value>5</value>
- </property>
- <property>
- <name>dataSourceConfig.rmUrls</name>
- <description>dataSourceConfig.rmUrls</description>
+ <name>endpointConfig.rmUrls</name>
+ <displayName>Resource Manager URLs</displayName>
+ <description>resource manager urls of this site</description>
<value>http://sandbox.hortonworks.com:8088</value>
+ <required>true</required>
</property>
+
<property>
<name>MRConfigureKeys.jobConfigKey</name>
+ <displayName>Map Reduce Extracted Configuration Keys</displayName>
+ <description>which configures will be extracted from map reduce job configurations</description>
<value>mapreduce.map.output.compress,
mapreduce.map.output.compress.codec,
mapreduce.output.fileoutputformat.compress,
@@ -148,6 +86,8 @@
</property>
<property>
<name>MRConfigureKeys.jobNameKey</name>
+ <displayName>Map Reduce Job Name Key</displayName>
+ <description>User use -Dkey=value to specify name of a job when submit. use this to extract job name from job configuration</description>
<value>eagle.job.name</value>
</property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
index 9c354c7..6d1be06 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
@@ -18,49 +18,38 @@
"mode":"LOCAL",
application.storm.nimbusHost=localhost,
"workers" : 8,
- "envContextConfig" : {
- "stormConfigFile" : "storm.yaml",
- "parallelismConfig" : {
- "mrRunningJobFetchSpout" : 1,
- "mrRunningJobParseBolt" : 16
- },
- "tasks" : {
- "mrRunningJobFetchSpout" : 1,
- "mrRunningJobParseBolt" : 16
- }
- },
+ "siteId" : "sandbox",
- "jobExtractorConfig" : {
- "site" : "sandbox",
- "fetchRunningJobInterval" : 60,
- "parseJobThreadPoolSize" : 6,
- "topAndBottomTaskByElapsedTime" : 10
+ "stormConfig" : {
+ "mrRunningJobFetchSpoutTasks" : 1,
+ "mrRunningJobParseBoltTasks" : 8
},
- "zookeeperConfig" : {
+ "zookeeper" : {
"zkQuorum" : "sandbox.hortonworks.com:2181",
"zkPort" : "2181",
- "zkRoot" : "/apps/mr/running",
+ "zkRoot" : "/apps/mr/runningSandbox",
"zkSessionTimeoutMs" : 15000,
"zkRetryTimes" : 3,
"zkRetryInterval" : 20000
},
- "dataSourceConfig" : {
- "rmUrls": "http://sandbox.hortonworks.com:50030"
+ "endpointConfig" : {
+ "rmUrls": "http://sandbox.hortonworks.com:50030",
+ "fetchRunningJobInterval" : 60,
+ "parseJobThreadPoolSize" : 6,
},
- "eagleProps" : {
- "eagleService": {
- "host": "sandbox.hortonworks.com",
- "port": 9099,
- "readTimeOutSeconds" : 20,
- "maxFlushNum" : 500,
- "username": "admin",
- "password": "secret"
- }
+
+ "service": {
+ "host": "sandbox.hortonworks.com",
+ "port": 9099,
+ "readTimeOutSeconds" : 20,
+ "username": "admin",
+ "password": "secret"
},
+
"MRConfigureKeys" : {
"jobNameKey" : "eagle.job.name",
"jobConfigKey" : "mapreduce.map.output.compress, mapreduce.map.output.compress.codec, mapreduce.output.fileoutputformat.compress, mapreduce.output.fileoutputformat.compress.type, mapreduce.output.fileoutputformat.compress.codec, mapred.output.format.class, eagle.job.runid, eagle.job.runidfieldname, eagle.job.name, eagle.job.normalizedfieldname, eagle.alert.email, eagle.job.alertemailaddress, dataplatform.etl.info, mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, mapreduce.map.java.opts, mapreduce.reduce.java.opts"