You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2017/02/20 03:23:13 UTC

eagle git commit: [MINOR] remove useless code and fix fetch running job config time out

Repository: eagle
Updated Branches:
  refs/heads/master 73d03b9e5 -> 3027e5fbc


[MINOR] remove useless code and fix fetch running job config time out

Author: wujinhu <wu...@126.com>

Closes #809 from wujinhu/EAGLE-844.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/3027e5fb
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/3027e5fb
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/3027e5fb

Branch: refs/heads/master
Commit: 3027e5fbc7d30f938371c17caddb3b2d91ad5b54
Parents: 73d03b9
Author: wujinhu <wu...@126.com>
Authored: Mon Feb 20 11:23:09 2017 +0800
Committer: wujinhu <wu...@126.com>
Committed: Mon Feb 20 11:23:09 2017 +0800

----------------------------------------------------------------------
 .../jpm/mr/running/MRRunningJobApplication.java     |  3 +--
 .../eagle/jpm/mr/running/parser/MRJobParser.java    | 15 ++++++---------
 .../jpm/mr/running/storm/MRRunningJobParseBolt.java | 10 ++--------
 .../jpm/mr/running/MRRunningJobApplicationTest.java |  3 +--
 .../jpm/mr/running/parser/MRJobParserTest.java      | 16 ++++++++--------
 5 files changed, 18 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index 309146e..7b1e2fb 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -68,8 +68,7 @@ public class MRRunningJobApplication extends StormApplication {
                 mrRunningJobConfig.getEndpointConfig(),
                 mrRunningJobConfig.getZkStateConfig(),
                 confKeyKeys,
-                config,
-                new MRJobPerformanceAnalyzer(config)),
+                config),
                 tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
         return topologyBuilder.createTopology();
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 525ffc2..0f2ede6 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -77,14 +77,12 @@ public class MRJobParser implements Runnable {
     private Map<String, String> commonTags = new HashMap<>();
     private MRRunningJobManager runningJobManager;
     private ParserStatus parserStatus;
-    private ResourceFetcher rmResourceFetcher;
     private Set<String> finishedTaskIds;
     private List<String> configKeys;
     private static final int TOP_BOTTOM_TASKS_BY_ELAPSED_TIME = 10;
     private static final int FLUSH_TASKS_EVERY_TIME = 5;
     private static final int MAX_TASKS_PERMIT = 5000;
     private Config config;
-    private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer;
 
     static {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
@@ -93,10 +91,9 @@ public class MRJobParser implements Runnable {
     public MRJobParser(MRRunningJobConfig.EndpointConfig endpointConfig,
                        MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
                        AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
-                       MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher,
+                       MRRunningJobManager runningJobManager,
                        List<String> configKeys,
-                       Config config,
-                       MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) {
+                       Config config) {
         this.app = app;
         if (mrJobMap == null) {
             this.mrJobEntityMap = new HashMap<>();
@@ -112,11 +109,9 @@ public class MRJobParser implements Runnable {
         this.commonTags.put(MRJobTagName.JOB_QUEUE.toString(), app.getQueue());
         this.runningJobManager = runningJobManager;
         this.parserStatus = ParserStatus.FINISHED;
-        this.rmResourceFetcher = rmResourceFetcher;
         this.finishedTaskIds = new HashSet<>();
         this.configKeys = configKeys;
         this.config = config;
-        this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer;
     }
 
     public void setAppInfo(AppInfo app) {
@@ -175,7 +170,6 @@ public class MRJobParser implements Runnable {
                     break;
                 }
             }
-            mrJobPerformanceAnalyzer.analyze(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
         }
     }
 
@@ -572,7 +566,10 @@ public class MRJobParser implements Runnable {
                 LOG.warn("exception found when process application {}, {}", app.getId(), e);
             } finally {
                 for (String jobId : mrJobEntityMap.keySet()) {
-                    mrJobEntityCreationHandler.add(mrJobEntityMap.get(jobId));
+                    JobExecutionAPIEntity entity = mrJobEntityMap.get(jobId);
+                    if (entity.getTags().containsKey(MRJobTagName.JOB_TYPE.toString())) {
+                        mrJobEntityCreationHandler.add(entity);
+                    }
                 }
                 if (mrJobEntityCreationHandler.flush()) { //force flush
                     //we must flush entities before delete from zk in case of missing finish state of jobs

http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 915df8a..a8db603 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -49,24 +49,20 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
     private Map<String, MRJobParser> runningMRParsers;
     private transient MRRunningJobManager runningJobManager;
     private MRRunningJobConfig.EagleServiceConfig eagleServiceConfig;
-    private ResourceFetcher resourceFetcher;
     private List<String> configKeys;
     private Config config;
-    private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer;
 
     public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
                                  MRRunningJobConfig.EndpointConfig endpointConfig,
                                  MRRunningJobConfig.ZKStateConfig zkStateConfig,
                                  List<String> configKeys,
-                                 Config config,
-                                 MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) {
+                                 Config config) {
         this.eagleServiceConfig = eagleServiceConfig;
         this.endpointConfig = endpointConfig;
         this.runningMRParsers = new HashMap<>();
         this.zkStateConfig = zkStateConfig;
         this.configKeys = configKeys;
         this.config = config;
-        this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer;
     }
 
     @Override
@@ -74,7 +70,6 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
         this.executorService = Executors.newFixedThreadPool(endpointConfig.parseJobThreadPoolSize);
 
         this.runningJobManager = new MRRunningJobManager(zkStateConfig);
-        this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
     }
 
     @Override
@@ -87,8 +82,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
         MRJobParser applicationParser;
         if (!runningMRParsers.containsKey(appInfo.getId())) {
             applicationParser = new MRJobParser(endpointConfig, eagleServiceConfig,
-                    appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config,
-                    mrJobPerformanceAnalyzer);
+                    appInfo, mrJobs, runningJobManager, configKeys, this.config);
             runningMRParsers.put(appInfo.getId(), applicationParser);
             LOG.info("create application parser for {}", appInfo.getId());
         } else {

http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
index f4bd2fa..5ebd9c5 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -89,8 +89,7 @@ public class MRRunningJobApplicationTest {
                 mrRunningJobConfig.getEndpointConfig(),
                 mrRunningJobConfig.getZkStateConfig(),
                 confKeyKeys,
-                config,
-                new MRJobPerformanceAnalyzer(config));
+                config);
         MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class);
         PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager);
         mrRunningJobParseBolt.prepare(null, null, null);

http://git-wip-us.apache.org/repos/asf/eagle/blob/3027e5fb/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index a4748ac..227eecb 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -132,7 +132,7 @@ public class MRJobParserTest {
         MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
         RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+                app1, mrJobs, runningJobManager, confKeyKeys, config);
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -187,7 +187,7 @@ public class MRJobParserTest {
         MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
         RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+                app1, mrJobs, runningJobManager, confKeyKeys, config);
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -229,7 +229,7 @@ public class MRJobParserTest {
         MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
         RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+                app1, mrJobs, runningJobManager, confKeyKeys, config);
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -248,7 +248,7 @@ public class MRJobParserTest {
         Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
         Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
         Assert.assertTrue(entities.isEmpty());
-        verify(client, times(1)).create(any());
+        verify(client, times(0)).create(any());
     }
 
 
@@ -273,7 +273,7 @@ public class MRJobParserTest {
         MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
         RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+                app1, mrJobs, runningJobManager, confKeyKeys, config);
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -319,7 +319,7 @@ public class MRJobParserTest {
         RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
         when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+                app1, mrJobs, runningJobManager, confKeyKeys, config);
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -342,7 +342,7 @@ public class MRJobParserTest {
         Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
         Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
         Assert.assertTrue(entities.isEmpty());
-        verify(client, times(1)).create(any());
+        verify(client, times(0)).create(any());
     }
 
 
@@ -378,7 +378,7 @@ public class MRJobParserTest {
         RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
         when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
+                app1, mrJobs, runningJobManager, confKeyKeys, config);
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);