You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2017/01/09 14:10:33 UTC

eagle git commit: [MINOR] optimize mr running job list api

Repository: eagle
Updated Branches:
  refs/heads/master c9c475e2a -> 02d6cce73


[MINOR] optimize mr running job list api

Author: wujinhu <wu...@126.com>

Closes #768 from wujinhu/EAGLE-842.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/02d6cce7
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/02d6cce7
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/02d6cce7

Branch: refs/heads/master
Commit: 02d6cce73679ce90fe610dbdb30fd1cbdd062897
Parents: c9c475e
Author: wujinhu <wu...@126.com>
Authored: Mon Jan 9 22:10:16 2017 +0800
Committer: wujinhu <wu...@126.com>
Committed: Mon Jan 9 22:10:16 2017 +0800

----------------------------------------------------------------------
 .../eagle/jpm/mr/running/parser/MRJobParser.java      | 11 ++++-------
 .../eagle/jpm/mr/running/parser/MRJobParserTest.java  | 14 +++++++-------
 2 files changed, 11 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/02d6cce7/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index d866c1c..52c1866 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -142,10 +142,9 @@ public class MRJobParser implements Runnable {
             if (fetchMRJobs()) {
                 break;
             } else if (i >= MAX_RETRY_TIMES - 1) {
-                //check whether the app has finished. if we test that we can connect rm, then we consider the jobs have finished
-                //if we get here either because of cannot connect rm or the jobs have finished
-                rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
-                mrJobEntityMap.keySet().forEach(this::finishMRJob);
+                if (app.getState().equals(Constants.AppState.FINISHED.toString())) {
+                    mrJobEntityMap.keySet().forEach(this::finishMRJob);
+                }
                 return;
             }
         }
@@ -166,9 +165,6 @@ public class MRJobParser implements Runnable {
                     }
                 }
                 if (i >= MAX_RETRY_TIMES) {
-                    //may caused by rm unreachable
-                    rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
-                    finishMRJob(jobId);
                     break;
                 }
             }
@@ -575,6 +571,7 @@ public class MRJobParser implements Runnable {
                     //delete from zk if needed
                     mrJobEntityMap.keySet()
                         .stream()
+                        .filter(jobId -> mrJobEntityMap.get(jobId).getInternalState() != null)
                         .filter(
                             jobId -> mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FINISHED.toString())
                                 || mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FAILED.toString()))

http://git-wip-us.apache.org/repos/asf/eagle/blob/02d6cce7/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index a2fb6ca..e0b5533 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -335,9 +335,9 @@ public class MRJobParserTest {
         Assert.assertTrue(jobIdToJobConfig.isEmpty());
         Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
         JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
-        Assert.assertEquals(Constants.AppState.FINISHED.toString(), jobExecutionAPIEntity.getInternalState());
+        Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getInternalState());
         Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState());
-        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.APP_FINISHED);
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
         Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
         Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
         Assert.assertTrue(entities.isEmpty());
@@ -391,14 +391,14 @@ public class MRJobParserTest {
 
         mrJobParser.run();
 
-        Assert.assertTrue(jobIdToJobConfig.isEmpty());
+        Assert.assertTrue(!jobIdToJobConfig.isEmpty());
         Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
         JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
-        Assert.assertEquals(Constants.AppState.FINISHED.toString(), jobExecutionAPIEntity.getInternalState());
+        Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getInternalState());
         Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState());
-        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.APP_FINISHED);
-        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
-        Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) != null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null);
         Assert.assertTrue(entities.isEmpty());
         verify(client, times(2)).create(any());
         verify(client, times(1)).getJerseyClient();