You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/11 15:43:53 UTC

airavata git commit: Infinitely retry to recover in case monitoting failure

Repository: airavata
Updated Branches:
  refs/heads/airavata-0.15-release-branch 7b9418ae9 -> dce8796bf


Infinitely retry to recover in case monitoting failure


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dce8796b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dce8796b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dce8796b

Branch: refs/heads/airavata-0.15-release-branch
Commit: dce8796bff6bae5ca28785975830e832ceaadf99
Parents: 7b9418a
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Jun 11 09:43:48 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Jun 11 09:43:48 2015 -0400

----------------------------------------------------------------------
 .../gfac/monitor/email/EmailBasedMonitor.java   | 53 ++++++++++++++++----
 1 file changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/dce8796b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index f9b7eb5..4959449 100644
--- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -90,6 +90,7 @@ public class EmailBasedMonitor implements Runnable{
     private Date monitorStartDate;
     private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
     private ExecutorService executor;
+    private Message[] flushUnseenMessages;
 
     public EmailBasedMonitor(ResourceJobManagerType type) throws AiravataException {
         init();
@@ -184,7 +185,6 @@ public class EmailBasedMonitor implements Runnable{
             store.connect(host, emailAddress, password);
             emailFolder = store.getFolder(folderName);
             // first time we search for all unread messages.
-            SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false);
             while (!(stopMonitoring || ServerSettings.isStopAllThreads())) {
                 Thread.sleep(ServerSettings.getEmailMonitorPeriod());// sleep a bit - get a rest till job finishes
                 if (jobMonitorMap.isEmpty()) {
@@ -197,15 +197,30 @@ public class EmailBasedMonitor implements Runnable{
                     store.connect();
                     emailFolder = store.getFolder(folderName);
                 }
-                log.info("[EJM]: Retrieving unseen emails");
                 emailFolder.open(Folder.READ_WRITE);
-                Message[] searchMessages = emailFolder.search(unseenBefore);
-                if (searchMessages == null || searchMessages.length == 0) {
-                    log.info("[EJM]: No new email messages");
-                } else {
-                    log.info("[EJM]: "+searchMessages.length + " new email/s received");
+                if (emailFolder.isOpen()) {
+                    // flush if any message left in flushUnseenMessage
+                    if (flushUnseenMessages != null && flushUnseenMessages.length > 0) {
+                        try {
+                            emailFolder.setFlags(flushUnseenMessages, new Flags(Flags.Flag.SEEN), false);
+                            flushUnseenMessages = null;
+                        } catch (MessagingException e) {
+                            if (!store.isConnected()) {
+                                store.connect();
+                                emailFolder.setFlags(flushUnseenMessages, new Flags(Flags.Flag.SEEN), false);
+                                flushUnseenMessages = null;
+                            }
+                        }
+                    }
+                    // read unread messages
+                    Message[] searchMessages = getMessagesToProcess();
+                    if (searchMessages == null || searchMessages.length == 0) {
+                        log.info("[EJM]: No new email messages");
+                    } else {
+                        log.info("[EJM]: " + searchMessages.length + " new email/s received");
+                    }
+                    processMessages(searchMessages);
                 }
-                processMessages(searchMessages);
                 emailFolder.close(false);
             }
         } catch (MessagingException e) {
@@ -222,6 +237,19 @@ public class EmailBasedMonitor implements Runnable{
                 log.error("[EJM]: Store close operation failed, couldn't close store", e);
             }
         }
+        // Recursively try to connect to email server and monitor
+        if (!(stopMonitoring || ServerSettings.isStopAllThreads())) {
+            log.info("[EJM]: Retry email monitoring on exceptions");
+            run();
+        }
+
+    }
+
+    private Message[] getMessagesToProcess() throws MessagingException {
+        //TODO: improve this logic to get only previously unprocessed unread mails.
+        SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false);
+        log.info("[EJM]: Retrieving unseen emails");
+        return emailFolder.search(unseenBefore);
     }
 
     private void processMessages(Message[] searchMessages) throws MessagingException {
@@ -298,8 +326,13 @@ public class EmailBasedMonitor implements Runnable{
                 emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
             } catch (MessagingException e) {
                 if (!store.isConnected()) {
-                    store.connect();
-                    emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+                    try {
+                        store.connect();
+                        emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+                    } catch (MessagingException e1) {
+                        flushUnseenMessages = unseenMessages; // anyway we need to push this update.
+                        throw e1;
+                    }
 
                 }
             }