You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2017/02/20 03:23:13 UTC
eagle git commit: [MINOR] remove useless code and fix fetch running
job config time out
Repository: eagle
Updated Branches:
refs/heads/master 73d03b9e5 -> 3027e5fbc
[MINOR] remove useless code and fix fetch running job config time out
Author: wujinhu <wu...@126.com>
Closes #809 from wujinhu/EAGLE-844.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/3027e5fb
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/3027e5fb
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/3027e5fb
Branch: refs/heads/master
Commit: 3027e5fbc7d30f938371c17caddb3b2d91ad5b54
Parents: 73d03b9
Author: wujinhu <wu...@126.com>
Authored: Mon Feb 20 11:23:09 2017 +0800
Committer: wujinhu <wu...@126.com>
Committed: Mon Feb 20 11:23:09 2017 +0800
----------------------------------------------------------------------
.../jpm/mr/running/MRRunningJobApplication.java | 3 +--
.../eagle/jpm/mr/running/parser/MRJobParser.java | 15 ++++++---------
.../jpm/mr/running/storm/MRRunningJobParseBolt.java | 10 ++--------
.../jpm/mr/running/MRRunningJobApplicationTest.java | 3 +--
.../jpm/mr/running/parser/MRJobParserTest.java | 16 ++++++++--------
5 files changed, 18 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index 309146e..7b1e2fb 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -68,8 +68,7 @@ public class MRRunningJobApplication extends StormApplication {
mrRunningJobConfig.getEndpointConfig(),
mrRunningJobConfig.getZkStateConfig(),
confKeyKeys,
- config,
- new MRJobPerformanceAnalyzer(config)),
+ config),
tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
return topologyBuilder.createTopology();
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 525ffc2..0f2ede6 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -77,14 +77,12 @@ public class MRJobParser implements Runnable {
private Map<String, String> commonTags = new HashMap<>();
private MRRunningJobManager runningJobManager;
private ParserStatus parserStatus;
- private ResourceFetcher rmResourceFetcher;
private Set<String> finishedTaskIds;
private List<String> configKeys;
private static final int TOP_BOTTOM_TASKS_BY_ELAPSED_TIME = 10;
private static final int FLUSH_TASKS_EVERY_TIME = 5;
private static final int MAX_TASKS_PERMIT = 5000;
private Config config;
- private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer;
static {
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
@@ -93,10 +91,9 @@ public class MRJobParser implements Runnable {
public MRJobParser(MRRunningJobConfig.EndpointConfig endpointConfig,
MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
- MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher,
+ MRRunningJobManager runningJobManager,
List<String> configKeys,
- Config config,
- MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) {
+ Config config) {
this.app = app;
if (mrJobMap == null) {
this.mrJobEntityMap = new HashMap<>();
@@ -112,11 +109,9 @@ public class MRJobParser implements Runnable {
this.commonTags.put(MRJobTagName.JOB_QUEUE.toString(), app.getQueue());
this.runningJobManager = runningJobManager;
this.parserStatus = ParserStatus.FINISHED;
- this.rmResourceFetcher = rmResourceFetcher;
this.finishedTaskIds = new HashSet<>();
this.configKeys = configKeys;
this.config = config;
- this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer;
}
public void setAppInfo(AppInfo app) {
@@ -175,7 +170,6 @@ public class MRJobParser implements Runnable {
break;
}
}
- mrJobPerformanceAnalyzer.analyze(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
}
}
@@ -572,7 +566,10 @@ public class MRJobParser implements Runnable {
LOG.warn("exception found when process application {}, {}", app.getId(), e);
} finally {
for (String jobId : mrJobEntityMap.keySet()) {
- mrJobEntityCreationHandler.add(mrJobEntityMap.get(jobId));
+ JobExecutionAPIEntity entity = mrJobEntityMap.get(jobId);
+ if (entity.getTags().containsKey(MRJobTagName.JOB_TYPE.toString())) {
+ mrJobEntityCreationHandler.add(entity);
+ }
}
if (mrJobEntityCreationHandler.flush()) { //force flush
//we must flush entities before delete from zk in case of missing finish state of jobs
http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 915df8a..a8db603 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -49,24 +49,20 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
private Map<String, MRJobParser> runningMRParsers;
private transient MRRunningJobManager runningJobManager;
private MRRunningJobConfig.EagleServiceConfig eagleServiceConfig;
- private ResourceFetcher resourceFetcher;
private List<String> configKeys;
private Config config;
- private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer;
public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
MRRunningJobConfig.EndpointConfig endpointConfig,
MRRunningJobConfig.ZKStateConfig zkStateConfig,
List<String> configKeys,
- Config config,
- MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) {
+ Config config) {
this.eagleServiceConfig = eagleServiceConfig;
this.endpointConfig = endpointConfig;
this.runningMRParsers = new HashMap<>();
this.zkStateConfig = zkStateConfig;
this.configKeys = configKeys;
this.config = config;
- this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer;
}
@Override
@@ -74,7 +70,6 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
this.executorService = Executors.newFixedThreadPool(endpointConfig.parseJobThreadPoolSize);
this.runningJobManager = new MRRunningJobManager(zkStateConfig);
- this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
}
@Override
@@ -87,8 +82,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
MRJobParser applicationParser;
if (!runningMRParsers.containsKey(appInfo.getId())) {
applicationParser = new MRJobParser(endpointConfig, eagleServiceConfig,
- appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config,
- mrJobPerformanceAnalyzer);
+ appInfo, mrJobs, runningJobManager, configKeys, this.config);
runningMRParsers.put(appInfo.getId(), applicationParser);
LOG.info("create application parser for {}", appInfo.getId());
} else {
http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
index f4bd2fa..5ebd9c5 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -89,8 +89,7 @@ public class MRRunningJobApplicationTest {
mrRunningJobConfig.getEndpointConfig(),
mrRunningJobConfig.getZkStateConfig(),
confKeyKeys,
- config,
- new MRJobPerformanceAnalyzer(config));
+ config);
MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class);
PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager);
mrRunningJobParseBolt.prepare(null, null, null);
http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index a4748ac..227eecb 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -132,7 +132,7 @@ public class MRJobParserTest {
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+ app1, mrJobs, runningJobManager, confKeyKeys, config);
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -187,7 +187,7 @@ public class MRJobParserTest {
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+ app1, mrJobs, runningJobManager, confKeyKeys, config);
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -229,7 +229,7 @@ public class MRJobParserTest {
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+ app1, mrJobs, runningJobManager, confKeyKeys, config);
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -248,7 +248,7 @@ public class MRJobParserTest {
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
Assert.assertTrue(entities.isEmpty());
- verify(client, times(1)).create(any());
+ verify(client, times(0)).create(any());
}
@@ -273,7 +273,7 @@ public class MRJobParserTest {
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+ app1, mrJobs, runningJobManager, confKeyKeys, config);
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -319,7 +319,7 @@ public class MRJobParserTest {
RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+ app1, mrJobs, runningJobManager, confKeyKeys, config);
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -342,7 +342,7 @@ public class MRJobParserTest {
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
Assert.assertTrue(entities.isEmpty());
- verify(client, times(1)).create(any());
+ verify(client, times(0)).create(any());
}
@@ -378,7 +378,7 @@ public class MRJobParserTest {
RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+ app1, mrJobs, runningJobManager, confKeyKeys, config);
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);