You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/29 07:10:43 UTC
incubator-eagle git commit: [EAGLE-796] MRJobEntityCreationHandler
flush need N times
Repository: incubator-eagle
Updated Branches:
refs/heads/master dcefa2068 -> c69f94eff
[EAGLE-796] MRJobEntityCreationHandler flush need N times
EAGLE-796 MRJobEntityCreationHandler flush need N times
- It will retry until flushing has succeeded or tried more than 3 times.
https://issues.apache.org/jira/browse/EAGLE-796
Author: chitin <ch...@gmail.com>
Closes #691 from chitin/EAGLE-796.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c69f94ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c69f94ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c69f94ef
Branch: refs/heads/master
Commit: c69f94eff767a7c1cef3b66f23be21aae0d2ec71
Parents: dcefa20
Author: chitin <ch...@gmail.com>
Authored: Tue Nov 29 15:10:35 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Nov 29 15:10:35 2016 +0800
----------------------------------------------------------------------
.../parser/MRJobEntityCreationHandler.java | 34 ++++++++++++++++----
.../jpm/mr/running/parser/MRJobParserTest.java | 16 +++++++--
2 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c69f94ef/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index ad80fd6..abd0594 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -26,7 +26,6 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +40,7 @@ public class MRJobEntityCreationHandler {
private JobExecutionMetricsCreationListener jobMetricsListener;
private TaskExecutionMetricsCreationListener taskMetricsListener;
private static final int MAX_FLUSH_NUM = 1000;
+ private static final int MAX_RETRY_COUNT = 3;
public MRJobEntityCreationHandler(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig) {
this.eagleServiceConfig = eagleServiceConfig;
@@ -79,13 +79,9 @@ public class MRJobEntityCreationHandler {
eagleServiceConfig.password);
client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
try {
- LOG.info("start to flush mr job entities, size {}", entities.size());
- client.create(entities);
- LOG.info("finish flushing mr job entities, size {}", entities.size());
- entities.clear();
+ return createEntities(client);
} catch (Exception e) {
LOG.warn("exception found when flush entities, {}", e);
- e.printStackTrace();
return false;
} finally {
client.getJerseyClient().destroy();
@@ -95,7 +91,31 @@ public class MRJobEntityCreationHandler {
// ignored exception
}
}
+ }
- return true;
+ private boolean createEntities(IEagleServiceClient client) throws Exception {
+ int count = 0;
+ boolean success = false;
+ while (count++ < MAX_RETRY_COUNT && !success) {
+ try {
+ LOG.info("start to flush mr job entities, size {}", entities.size());
+ client.create(entities);
+ success = true;
+ LOG.info("finish flushing mr job entities, size {}", entities.size());
+ entities.clear();
+
+ } catch (Exception e) {
+ LOG.warn("exception found when flush entities, {}", e);
+ if (!success && count < MAX_RETRY_COUNT) {
+ LOG.info("Sleep for a while before retrying");
+ Thread.sleep(10 * 1000);
+ }
+ }
+
+ }
+ if (!success) {
+ LOG.warn("Fail flushing entities after tries {} times", MAX_RETRY_COUNT);
+ }
+ return success;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c69f94ef/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index 074c364..3a71384 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -346,8 +346,18 @@ public class MRJobParserTest {
@Test
- public void testMRJobParserFetchJobCountFailButRMalive() throws Exception {
+ public void testMRJobParserFetchJobCountFailButRMaliveRetry() throws Exception {
setupMock();
+ reset(client);
+ client = mock(EagleServiceClientImpl.class);
+ MRRunningJobConfig.EagleServiceConfig eagleServiceConfig = mrRunningJobConfig.getEagleServiceConfig();
+ PowerMockito.whenNew(EagleServiceClientImpl.class).withArguments(
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password).thenReturn(client);
+ when(client.create(any())).thenThrow(Exception.class).thenReturn(null);
+ when(client.getJerseyClient()).thenReturn(new Client());
mockInputJobSteam("/mrjob_30784.json", JOB_URL);
mockInputJobSteamWithException(JOB_COUNT_URL);
mockGetConnection("/mrconf_30784.xml");
@@ -390,7 +400,9 @@ public class MRJobParserTest {
Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
Assert.assertTrue(entities.isEmpty());
- verify(client, times(1)).create(any());
+ verify(client, times(2)).create(any());
+ verify(client, times(2)).getJerseyClient();
+ verify(client, times(1)).close();
}