You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/11 15:43:53 UTC
airavata git commit: Infinitely retry to recover in case monitoting
failure
Repository: airavata
Updated Branches:
refs/heads/airavata-0.15-release-branch 7b9418ae9 -> dce8796bf
Infinitely retry to recover in case monitoting failure
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dce8796b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dce8796b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dce8796b
Branch: refs/heads/airavata-0.15-release-branch
Commit: dce8796bff6bae5ca28785975830e832ceaadf99
Parents: 7b9418a
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Jun 11 09:43:48 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Jun 11 09:43:48 2015 -0400
----------------------------------------------------------------------
.../gfac/monitor/email/EmailBasedMonitor.java | 53 ++++++++++++++++----
1 file changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/dce8796b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index f9b7eb5..4959449 100644
--- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -90,6 +90,7 @@ public class EmailBasedMonitor implements Runnable{
private Date monitorStartDate;
private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
private ExecutorService executor;
+ private Message[] flushUnseenMessages;
public EmailBasedMonitor(ResourceJobManagerType type) throws AiravataException {
init();
@@ -184,7 +185,6 @@ public class EmailBasedMonitor implements Runnable{
store.connect(host, emailAddress, password);
emailFolder = store.getFolder(folderName);
// first time we search for all unread messages.
- SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false);
while (!(stopMonitoring || ServerSettings.isStopAllThreads())) {
Thread.sleep(ServerSettings.getEmailMonitorPeriod());// sleep a bit - get a rest till job finishes
if (jobMonitorMap.isEmpty()) {
@@ -197,15 +197,30 @@ public class EmailBasedMonitor implements Runnable{
store.connect();
emailFolder = store.getFolder(folderName);
}
- log.info("[EJM]: Retrieving unseen emails");
emailFolder.open(Folder.READ_WRITE);
- Message[] searchMessages = emailFolder.search(unseenBefore);
- if (searchMessages == null || searchMessages.length == 0) {
- log.info("[EJM]: No new email messages");
- } else {
- log.info("[EJM]: "+searchMessages.length + " new email/s received");
+ if (emailFolder.isOpen()) {
+ // flush if any message left in flushUnseenMessage
+ if (flushUnseenMessages != null && flushUnseenMessages.length > 0) {
+ try {
+ emailFolder.setFlags(flushUnseenMessages, new Flags(Flags.Flag.SEEN), false);
+ flushUnseenMessages = null;
+ } catch (MessagingException e) {
+ if (!store.isConnected()) {
+ store.connect();
+ emailFolder.setFlags(flushUnseenMessages, new Flags(Flags.Flag.SEEN), false);
+ flushUnseenMessages = null;
+ }
+ }
+ }
+ // read unread messages
+ Message[] searchMessages = getMessagesToProcess();
+ if (searchMessages == null || searchMessages.length == 0) {
+ log.info("[EJM]: No new email messages");
+ } else {
+ log.info("[EJM]: " + searchMessages.length + " new email/s received");
+ }
+ processMessages(searchMessages);
}
- processMessages(searchMessages);
emailFolder.close(false);
}
} catch (MessagingException e) {
@@ -222,6 +237,19 @@ public class EmailBasedMonitor implements Runnable{
log.error("[EJM]: Store close operation failed, couldn't close store", e);
}
}
+ // Recursively try to connect to email server and monitor
+ if (!(stopMonitoring || ServerSettings.isStopAllThreads())) {
+ log.info("[EJM]: Retry email monitoring on exceptions");
+ run();
+ }
+
+ }
+
+ private Message[] getMessagesToProcess() throws MessagingException {
+ //TODO: improve this logic to get only previously unprocessed unread mails.
+ SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false);
+ log.info("[EJM]: Retrieving unseen emails");
+ return emailFolder.search(unseenBefore);
}
private void processMessages(Message[] searchMessages) throws MessagingException {
@@ -298,8 +326,13 @@ public class EmailBasedMonitor implements Runnable{
emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
} catch (MessagingException e) {
if (!store.isConnected()) {
- store.connect();
- emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+ try {
+ store.connect();
+ emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+ } catch (MessagingException e1) {
+ flushUnseenMessages = unseenMessages; // anyway we need to push this update.
+ throw e1;
+ }
}
}