You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/09 22:55:46 UTC

airavata git commit: Added thread pool to process emails

Repository: airavata
Updated Branches:
  refs/heads/airavata-0.15-release-branch a320a9474 -> 59e36d899


Added thread pool to process emails


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/59e36d89
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/59e36d89
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/59e36d89

Branch: refs/heads/airavata-0.15-release-branch
Commit: 59e36d89913fae0477a27c4fcfb070e127f3f897
Parents: a320a94
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Tue Jun 9 16:55:32 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Tue Jun 9 16:55:32 2015 -0400

----------------------------------------------------------------------
 .../gfac/monitor/email/EmailBasedMonitor.java   | 84 ++++++++++++++------
 1 file changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/59e36d89/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index a9363cb..f9b7eb5 100644
--- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -59,12 +59,19 @@ import javax.mail.search.FlagTerm;
 import javax.mail.search.SearchTerm;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 public class EmailBasedMonitor implements Runnable{
     private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class);
@@ -82,6 +89,7 @@ public class EmailBasedMonitor implements Runnable{
     private String host, emailAddress, password, storeProtocol, folderName ;
     private Date monitorStartDate;
     private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
+    private ExecutorService executor;
 
     public EmailBasedMonitor(ResourceJobManagerType type) throws AiravataException {
         init();
@@ -99,6 +107,8 @@ public class EmailBasedMonitor implements Runnable{
         }
         properties = new Properties();
         properties.put("mail.store.protocol", storeProtocol);
+        executor = Executors.newFixedThreadPool(30);
+
     }
 
     public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
@@ -215,37 +225,59 @@ public class EmailBasedMonitor implements Runnable{
     }
 
     private void processMessages(Message[] searchMessages) throws MessagingException {
-        List<Message> processedMessages = new ArrayList<>();
-        List<Message> unreadMessages = new ArrayList<>();
+        List<Message> processedMessages = Collections.synchronizedList(new ArrayList<>());
+        List<Message> unreadMessages = Collections.synchronizedList(new ArrayList<>());
+        List<Future<JobStatusResult>> futureList = new ArrayList<>();
+
+        // use thread pool to increase email processing
         for (Message message : searchMessages) {
-            try {
-                JobStatusResult jobStatusResult = parse(message);
-                JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
-                if (jEC == null) {
-                    jEC = jobMonitorMap.get(jobStatusResult.getJobName());
-                }
-                if (jEC != null) {
-                    process(jobStatusResult, jEC);
-                    processedMessages.add(message);
-                } else {
-                    // we can get JobExecutionContext null in multiple Gfac instances environment,
-                    // where this job is not submitted by this Gfac instance hence we ignore this message.
-                    unreadMessages.add(message);
+            Future<JobStatusResult> jobStatusFuture = executor.submit(new Callable<JobStatusResult>() {
+                JobStatusResult jobStatusResult = null;
+                @Override
+                public JobStatusResult call() throws Exception {
+                    try {
+                        jobStatusResult = parse(message);
+                        JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
+                        if (jEC == null) {
+                            jEC = jobMonitorMap.get(jobStatusResult.getJobName());
+                        }
+                        if (jEC != null) {
+                            process(jobStatusResult, jEC);
+                            processedMessages.add(message);
+                        } else {
+                            // we can get JobExecutionContext null in multiple Gfac instances environment,
+                            // where this job is not submitted by this Gfac instance hence we ignore this message.
+                            unreadMessages.add(message);
 //                  log.info("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId());
+                        }
+                    } catch (AiravataException e) {
+                        log.error("[EJM]: Error parsing email message =====================================>", e);
+                        try {
+                            writeEnvelopeOnError(message);
+                        } catch (MessagingException e1) {
+                            log.error("[EJM]: Error printing envelop of the email");
+                        }
+                        unreadMessages.add(message);
+                    } catch (MessagingException e) {
+                        log.error("[EJM]: Error while retrieving sender address from message : " + message.toString());
+                        unreadMessages.add(message);
+                    }
+                    return jobStatusResult;
                 }
-            } catch (AiravataException e) {
-                log.error("[EJM]: Error parsing email message =====================================>", e);
-                try {
-                    writeEnvelopeOnError(message);
-                } catch (MessagingException e1) {
-                    log.error("[EJM]: Error printing envelop of the email");
-                }
-                unreadMessages.add(message);
-            } catch (MessagingException e) {
-                log.error("[EJM]: Error while retrieving sender address from message : " + message.toString());
-                unreadMessages.add(message);
+            });
+            futureList.add(jobStatusFuture);
+        }
+        // wait until all thread returns
+        for (Future<JobStatusResult> jobStatusResultFuture : futureList) {
+            try {
+                jobStatusResultFuture.get();
+            } catch (InterruptedException e) {
+                log.error("Error while calling future.get() ", e);
+            } catch (ExecutionException e) {
+                log.error("Error while calling future.get()", e);
             }
         }
+
         if (!processedMessages.isEmpty()) {
             Message[] seenMessages = new Message[processedMessages.size()];
             processedMessages.toArray(seenMessages);