You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2017/01/09 14:10:33 UTC
eagle git commit: [MINOR] optimize mr running job list api
Repository: eagle
Updated Branches:
refs/heads/master c9c475e2a -> 02d6cce73
[MINOR] optimize mr running job list api
Author: wujinhu <wu...@126.com>
Closes #768 from wujinhu/EAGLE-842.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/02d6cce7
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/02d6cce7
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/02d6cce7
Branch: refs/heads/master
Commit: 02d6cce73679ce90fe610dbdb30fd1cbdd062897
Parents: c9c475e
Author: wujinhu <wu...@126.com>
Authored: Mon Jan 9 22:10:16 2017 +0800
Committer: wujinhu <wu...@126.com>
Committed: Mon Jan 9 22:10:16 2017 +0800
----------------------------------------------------------------------
.../eagle/jpm/mr/running/parser/MRJobParser.java | 11 ++++-------
.../eagle/jpm/mr/running/parser/MRJobParserTest.java | 14 +++++++-------
2 files changed, 11 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/02d6cce7/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index d866c1c..52c1866 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -142,10 +142,9 @@ public class MRJobParser implements Runnable {
if (fetchMRJobs()) {
break;
} else if (i >= MAX_RETRY_TIMES - 1) {
- //check whether the app has finished. if we test that we can connect rm, then we consider the jobs have finished
- //if we get here either because of cannot connect rm or the jobs have finished
- rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
- mrJobEntityMap.keySet().forEach(this::finishMRJob);
+ if (app.getState().equals(Constants.AppState.FINISHED.toString())) {
+ mrJobEntityMap.keySet().forEach(this::finishMRJob);
+ }
return;
}
}
@@ -166,9 +165,6 @@ public class MRJobParser implements Runnable {
}
}
if (i >= MAX_RETRY_TIMES) {
- //may caused by rm unreachable
- rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
- finishMRJob(jobId);
break;
}
}
@@ -575,6 +571,7 @@ public class MRJobParser implements Runnable {
//delete from zk if needed
mrJobEntityMap.keySet()
.stream()
+ .filter(jobId -> mrJobEntityMap.get(jobId).getInternalState() != null)
.filter(
jobId -> mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FINISHED.toString())
|| mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FAILED.toString()))
http://git-wip-us.apache.org/repos/asf/eagle/blob/02d6cce7/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index a2fb6ca..e0b5533 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -335,9 +335,9 @@ public class MRJobParserTest {
Assert.assertTrue(jobIdToJobConfig.isEmpty());
Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
- Assert.assertEquals(Constants.AppState.FINISHED.toString(), jobExecutionAPIEntity.getInternalState());
+ Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getInternalState());
Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState());
- Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.APP_FINISHED);
+ Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
Assert.assertTrue(entities.isEmpty());
@@ -391,14 +391,14 @@ public class MRJobParserTest {
mrJobParser.run();
- Assert.assertTrue(jobIdToJobConfig.isEmpty());
+ Assert.assertTrue(!jobIdToJobConfig.isEmpty());
Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
- Assert.assertEquals(Constants.AppState.FINISHED.toString(), jobExecutionAPIEntity.getInternalState());
+ Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getInternalState());
Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState());
- Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.APP_FINISHED);
- Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
- Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
+ Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+ Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) != null);
+ Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null);
Assert.assertTrue(entities.isEmpty());
verify(client, times(2)).create(any());
verify(client, times(1)).getJerseyClient();