You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/29 07:10:43 UTC

incubator-eagle git commit: [EAGLE-796] MRJobEntityCreationHandler flush need N times

Repository: incubator-eagle
Updated Branches:
  refs/heads/master dcefa2068 -> c69f94eff


[EAGLE-796] MRJobEntityCreationHandler flush need N times

EAGLE-796 MRJobEntityCreationHandler flush need N times
- It will retry until flushing has succeeded or tried more than 3 times.

https://issues.apache.org/jira/browse/EAGLE-796

Author: chitin <ch...@gmail.com>

Closes #691 from chitin/EAGLE-796.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c69f94ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c69f94ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c69f94ef

Branch: refs/heads/master
Commit: c69f94eff767a7c1cef3b66f23be21aae0d2ec71
Parents: dcefa20
Author: chitin <ch...@gmail.com>
Authored: Tue Nov 29 15:10:35 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Nov 29 15:10:35 2016 +0800

----------------------------------------------------------------------
 .../parser/MRJobEntityCreationHandler.java      | 34 ++++++++++++++++----
 .../jpm/mr/running/parser/MRJobParserTest.java  | 16 +++++++--
 2 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c69f94ef/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index ad80fd6..abd0594 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -26,7 +26,6 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +40,7 @@ public class MRJobEntityCreationHandler {
     private JobExecutionMetricsCreationListener jobMetricsListener;
     private TaskExecutionMetricsCreationListener taskMetricsListener;
     private static final int MAX_FLUSH_NUM = 1000;
+    private static final int MAX_RETRY_COUNT = 3;
 
     public MRJobEntityCreationHandler(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig) {
         this.eagleServiceConfig = eagleServiceConfig;
@@ -79,13 +79,9 @@ public class MRJobEntityCreationHandler {
             eagleServiceConfig.password);
         client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
         try {
-            LOG.info("start to flush mr job entities, size {}", entities.size());
-            client.create(entities);
-            LOG.info("finish flushing mr job entities, size {}", entities.size());
-            entities.clear();
+            return createEntities(client);
         } catch (Exception e) {
             LOG.warn("exception found when flush entities, {}", e);
-            e.printStackTrace();
             return false;
         } finally {
             client.getJerseyClient().destroy();
@@ -95,7 +91,31 @@ public class MRJobEntityCreationHandler {
                 // ignored exception
             }
         }
+    }
 
-        return true;
+    private boolean createEntities(IEagleServiceClient client) throws Exception {
+        int count = 0;
+        boolean success = false;
+        while (count++ < MAX_RETRY_COUNT && !success) {
+            try {
+                LOG.info("start to flush mr job entities, size {}", entities.size());
+                client.create(entities);
+                success = true;
+                LOG.info("finish flushing mr job entities, size {}", entities.size());
+                entities.clear();
+
+            } catch (Exception e) {
+                LOG.warn("exception found when flush entities, {}", e);
+                if (!success && count < MAX_RETRY_COUNT) {
+                    LOG.info("Sleep for a while before retrying");
+                    Thread.sleep(10 * 1000);
+                }
+            }
+
+        }
+        if (!success) {
+            LOG.warn("Fail flushing entities after tries {} times", MAX_RETRY_COUNT);
+        }
+        return success;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c69f94ef/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index 074c364..3a71384 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -346,8 +346,18 @@ public class MRJobParserTest {
 
 
     @Test
-    public void testMRJobParserFetchJobCountFailButRMalive() throws Exception {
+    public void testMRJobParserFetchJobCountFailButRMaliveRetry() throws Exception {
         setupMock();
+        reset(client);
+        client = mock(EagleServiceClientImpl.class);
+        MRRunningJobConfig.EagleServiceConfig eagleServiceConfig = mrRunningJobConfig.getEagleServiceConfig();
+        PowerMockito.whenNew(EagleServiceClientImpl.class).withArguments(
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password).thenReturn(client);
+        when(client.create(any())).thenThrow(Exception.class).thenReturn(null);
+        when(client.getJerseyClient()).thenReturn(new Client());
         mockInputJobSteam("/mrjob_30784.json", JOB_URL);
         mockInputJobSteamWithException(JOB_COUNT_URL);
         mockGetConnection("/mrconf_30784.xml");
@@ -390,7 +400,9 @@ public class MRJobParserTest {
         Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
         Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
         Assert.assertTrue(entities.isEmpty());
-        verify(client, times(1)).create(any());
+        verify(client, times(2)).create(any());
+        verify(client, times(2)).getJerseyClient();
+        verify(client, times(1)).close();
 
     }