You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/09 22:55:46 UTC
airavata git commit: Added thread pool to process emails
Repository: airavata
Updated Branches:
refs/heads/airavata-0.15-release-branch a320a9474 -> 59e36d899
Added thread pool to process emails
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/59e36d89
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/59e36d89
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/59e36d89
Branch: refs/heads/airavata-0.15-release-branch
Commit: 59e36d89913fae0477a27c4fcfb070e127f3f897
Parents: a320a94
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Tue Jun 9 16:55:32 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Tue Jun 9 16:55:32 2015 -0400
----------------------------------------------------------------------
.../gfac/monitor/email/EmailBasedMonitor.java | 84 ++++++++++++++------
1 file changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/59e36d89/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index a9363cb..f9b7eb5 100644
--- a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -59,12 +59,19 @@ import javax.mail.search.FlagTerm;
import javax.mail.search.SearchTerm;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
public class EmailBasedMonitor implements Runnable{
private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class);
@@ -82,6 +89,7 @@ public class EmailBasedMonitor implements Runnable{
private String host, emailAddress, password, storeProtocol, folderName ;
private Date monitorStartDate;
private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
+ private ExecutorService executor;
public EmailBasedMonitor(ResourceJobManagerType type) throws AiravataException {
init();
@@ -99,6 +107,8 @@ public class EmailBasedMonitor implements Runnable{
}
properties = new Properties();
properties.put("mail.store.protocol", storeProtocol);
+ executor = Executors.newFixedThreadPool(30);
+
}
public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
@@ -215,37 +225,59 @@ public class EmailBasedMonitor implements Runnable{
}
private void processMessages(Message[] searchMessages) throws MessagingException {
- List<Message> processedMessages = new ArrayList<>();
- List<Message> unreadMessages = new ArrayList<>();
+ List<Message> processedMessages = Collections.synchronizedList(new ArrayList<>());
+ List<Message> unreadMessages = Collections.synchronizedList(new ArrayList<>());
+ List<Future<JobStatusResult>> futureList = new ArrayList<>();
+
+ // use thread pool to increase email processing
for (Message message : searchMessages) {
- try {
- JobStatusResult jobStatusResult = parse(message);
- JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
- if (jEC == null) {
- jEC = jobMonitorMap.get(jobStatusResult.getJobName());
- }
- if (jEC != null) {
- process(jobStatusResult, jEC);
- processedMessages.add(message);
- } else {
- // we can get JobExecutionContext null in multiple Gfac instances environment,
- // where this job is not submitted by this Gfac instance hence we ignore this message.
- unreadMessages.add(message);
+ Future<JobStatusResult> jobStatusFuture = executor.submit(new Callable<JobStatusResult>() {
+ JobStatusResult jobStatusResult = null;
+ @Override
+ public JobStatusResult call() throws Exception {
+ try {
+ jobStatusResult = parse(message);
+ JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
+ if (jEC == null) {
+ jEC = jobMonitorMap.get(jobStatusResult.getJobName());
+ }
+ if (jEC != null) {
+ process(jobStatusResult, jEC);
+ processedMessages.add(message);
+ } else {
+ // we can get JobExecutionContext null in multiple Gfac instances environment,
+ // where this job is not submitted by this Gfac instance hence we ignore this message.
+ unreadMessages.add(message);
// log.info("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId());
+ }
+ } catch (AiravataException e) {
+ log.error("[EJM]: Error parsing email message =====================================>", e);
+ try {
+ writeEnvelopeOnError(message);
+ } catch (MessagingException e1) {
+ log.error("[EJM]: Error printing envelop of the email");
+ }
+ unreadMessages.add(message);
+ } catch (MessagingException e) {
+ log.error("[EJM]: Error while retrieving sender address from message : " + message.toString());
+ unreadMessages.add(message);
+ }
+ return jobStatusResult;
}
- } catch (AiravataException e) {
- log.error("[EJM]: Error parsing email message =====================================>", e);
- try {
- writeEnvelopeOnError(message);
- } catch (MessagingException e1) {
- log.error("[EJM]: Error printing envelop of the email");
- }
- unreadMessages.add(message);
- } catch (MessagingException e) {
- log.error("[EJM]: Error while retrieving sender address from message : " + message.toString());
- unreadMessages.add(message);
+ });
+ futureList.add(jobStatusFuture);
+ }
+ // wait until all thread returns
+ for (Future<JobStatusResult> jobStatusResultFuture : futureList) {
+ try {
+ jobStatusResultFuture.get();
+ } catch (InterruptedException e) {
+ log.error("Error while calling future.get() ", e);
+ } catch (ExecutionException e) {
+ log.error("Error while calling future.get()", e);
}
}
+
if (!processedMessages.isEmpty()) {
Message[] seenMessages = new Message[processedMessages.size()];
processedMessages.toArray(seenMessages);