You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/20 19:56:03 UTC
svn commit: r1552782 -
/manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java
Author: kwright
Date: Fri Dec 20 18:56:02 2013
New Revision: 1552782
URL: http://svn.apache.org/r1552782
Log:
Add background threads everywhere needed
Modified:
manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java
Modified: manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java?rev=1552782&r1=1552781&r2=1552782&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java Fri Dec 20 18:56:02 2013
@@ -78,7 +78,7 @@ public class EmailConnector extends org.
protected String username = null;
protected String password = null;
protected String protocol = null;
- protected Properties properties = new Properties();
+ protected Properties properties = null;
// Local session handle
protected EmailSession session = null;
@@ -108,6 +108,7 @@ public class EmailConnector extends org.
this.protocol = configParameters.getParameter(EmailConfig.PROTOCOL_PARAM);
this.username = configParameters.getParameter(EmailConfig.USERNAME_PARAM);
this.password = configParameters.getParameter(EmailConfig.PASSWORD_PARAM);
+ this.properties = new Properties();
int i = 0;
while (i < configParameters.getChildCount()) //In post property set is added as a configuration node
{
@@ -173,7 +174,11 @@ public class EmailConnector extends org.
finalizeConnection();
getSession();
try {
- session.checkConnection();
+ CheckConnectionThread cct = new CheckConnectionThread(session);
+ cct.start();
+ cct.finishUp();
+ } catch (InterruptedException e) {
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
} catch (MessagingException e) {
handleMessagingException(e,"checking the connection");
}
@@ -280,7 +285,9 @@ public class EmailConnector extends org.
for (String folderName : folderNames)
{
try {
- Folder folder = session.openFolder(folderName);
+ OpenFolderThread oft = new OpenFolderThread(session, folderName);
+ oft.start();
+ Folder folder = oft.finishUp();
try
{
Message[] messages = findMessages(folder, startTime, endTime, findMap);
@@ -291,8 +298,12 @@ public class EmailConnector extends org.
}
finally
{
- session.closeFolder(folder);
+ CloseFolderThread cft = new CloseFolderThread(session, folder);
+ cft.start();
+ cft.finishUp();
}
+ } catch (InterruptedException e) {
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
} catch (MessagingException e) {
handleMessagingException(e, "finding emails");
}
@@ -303,7 +314,8 @@ public class EmailConnector extends org.
/*
This method will return the list of messages which matches the given criteria
*/
- private Message[] findMessages(Folder folder, long startTime, long endTime, Map<String,String> findMap) throws MessagingException {
+ private Message[] findMessages(Folder folder, long startTime, long endTime, Map<String,String> findMap)
+ throws MessagingException, InterruptedException {
String findParameterName;
String findParameterValue;
@@ -343,9 +355,17 @@ public class EmailConnector extends org.
Message[] result;
if (searchTerm == null)
- result = session.getMessages(folder);
+ {
+ GetMessagesThread gmt = new GetMessagesThread(session, folder);
+ gmt.start();
+ result = gmt.finishUp();
+ }
else
- result = session.search(folder, searchTerm);
+ {
+ SearchMessagesThread smt = new SearchMessagesThread(session, folder, searchTerm);
+ smt.start();
+ result = smt.finishUp();
+ }
return result;
}
@@ -368,8 +388,12 @@ public class EmailConnector extends org.
port = -1;
try {
- session = new EmailSession(server, port, username, password,
+ ConnectThread connectThread = new ConnectThread(server, port, username, password,
providerMap.get(protocol), properties);
+ connectThread.start();
+ session = connectThread.finishUp();
+ } catch (InterruptedException e) {
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
} catch (MessagingException e) {
handleMessagingException(e, "connecting");
}
@@ -380,7 +404,10 @@ public class EmailConnector extends org.
protected void finalizeConnection() {
if (session != null) {
try {
- session.close();
+ CloseSessionThread closeSessionThread = new CloseSessionThread(session);
+ closeSessionThread.start();
+ closeSessionThread.finishUp();
+ } catch (InterruptedException e) {
} catch (MessagingException e) {
Logging.connectors.warn("Error while closing connection to server: " + e.getMessage(),e);
} finally {
@@ -476,7 +503,9 @@ public class EmailConnector extends org.
Folder folder = openFolders.get(folderName);
if (folder == null)
{
- folder = session.openFolder(folderName);
+ OpenFolderThread oft = new OpenFolderThread(session, folderName);
+ oft.start();
+ folder = oft.finishUp();
openFolders.put(folderName,folder);
}
@@ -485,10 +514,11 @@ public class EmailConnector extends org.
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("Email: Processing document identifier '"
+ compositeID + "'");
- MessageIDTerm messageIDTerm = new MessageIDTerm(id);
- Message[] message = null;
-
- message = session.search(folder, messageIDTerm);
+ SearchTerm messageIDTerm = new MessageIDTerm(id);
+
+ SearchMessagesThread smt = new SearchMessagesThread(session, folder, messageIDTerm);
+ smt.start();
+ Message[] message = smt.finishUp();
for (Message msg : message) {
RepositoryDocument rd = new RepositoryDocument();
@@ -571,6 +601,8 @@ public class EmailConnector extends org.
activities.ingestDocument(id, version, documentURI, rd);
}
+ } catch (InterruptedException e) {
+ throw new ManifoldCFException(e.getMessage(), ManifoldCFException.INTERRUPTED);
} catch (MessagingException e) {
handleMessagingException(e, "processing email");
} catch (IOException e) {
@@ -587,7 +619,13 @@ public class EmailConnector extends org.
{
try
{
- session.closeFolder(f);
+ CloseFolderThread cft = new CloseFolderThread(session, f);
+ cft.start();
+ cft.finishUp();
+ }
+ catch (InterruptedException e)
+ {
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
}
catch (MessagingException e)
{
@@ -1035,7 +1073,13 @@ public class EmailConnector extends org.
getSession();
try
{
- return session.listFolders();
+ ListFoldersThread lft = new ListFoldersThread(session);
+ lft.start();
+ return lft.finishUp();
+ }
+ catch (InterruptedException e)
+ {
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
}
catch (MessagingException e)
{
@@ -1197,4 +1241,439 @@ public class EmailConnector extends org.
}
}
+ /** Class to set up connection.
+ */
+ protected static class ConnectThread extends Thread
+ {
+ protected final String server;
+ protected final int port;
+ protected final String username;
+ protected final String password;
+ protected final String protocol;
+ protected final Properties properties;
+
+ // Local session handle
+ protected EmailSession session = null;
+ protected Throwable exception = null;
+
+ public ConnectThread(String server, int port, String username, String password, String protocol, Properties properties)
+ {
+ this.server = server;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ this.protocol = protocol;
+ this.properties = properties;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ session = new EmailSession(server, port, username, password, protocol, properties);
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+
+ public EmailSession finishUp()
+ throws MessagingException, InterruptedException
+ {
+ try
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof MessagingException)
+ throw (MessagingException)exception;
+ else
+ throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ return session;
+ } catch (InterruptedException e) {
+ this.interrupt();
+ throw e;
+ }
+ }
+ }
+
+ /** Class to close the session.
+ */
+ protected static class CloseSessionThread extends Thread
+ {
+ protected final EmailSession session;
+
+ protected Throwable exception = null;
+
+ public CloseSessionThread(EmailSession session)
+ {
+ this.session = session;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+
+ public void finishUp()
+ throws MessagingException, InterruptedException
+ {
+ try
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof MessagingException)
+ throw (MessagingException)exception;
+ else
+ throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ } catch (InterruptedException e) {
+ this.interrupt();
+ throw e;
+ }
+ }
+ }
+
+ /** Class to list all folders.
+ */
+ protected static class ListFoldersThread extends Thread
+ {
+ protected final EmailSession session;
+
+ protected String[] rval = null;
+ protected Throwable exception = null;
+
+ public ListFoldersThread(EmailSession session)
+ {
+ this.session = session;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ rval = session.listFolders();
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+
+ public String[] finishUp()
+ throws MessagingException, InterruptedException
+ {
+ try
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof MessagingException)
+ throw (MessagingException)exception;
+ else
+ throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ return rval;
+ } catch (InterruptedException e) {
+ this.interrupt();
+ throw e;
+ }
+ }
+ }
+
+ /** Class to check the connection.
+ */
+ protected static class CheckConnectionThread extends Thread
+ {
+ protected final EmailSession session;
+
+ protected Throwable exception = null;
+
+ public CheckConnectionThread(EmailSession session)
+ {
+ this.session = session;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ session.checkConnection();
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+
+ public void finishUp()
+ throws MessagingException, InterruptedException
+ {
+ try
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof MessagingException)
+ throw (MessagingException)exception;
+ else
+ throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ } catch (InterruptedException e) {
+ this.interrupt();
+ throw e;
+ }
+ }
+ }
+
+ /** Class to open a folder.
+ */
+ protected static class OpenFolderThread extends Thread
+ {
+ protected final EmailSession session;
+ protected final String folderName;
+
+ // Local folder
+ protected Folder folder = null;
+ protected Throwable exception = null;
+
+ public OpenFolderThread(EmailSession session, String folderName)
+ {
+ this.session = session;
+ this.folderName = folderName;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ folder = session.openFolder(folderName);
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+
+ public Folder finishUp()
+ throws MessagingException, InterruptedException
+ {
+ try
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof MessagingException)
+ throw (MessagingException)exception;
+ else
+ throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ return folder;
+ } catch (InterruptedException e) {
+ this.interrupt();
+ throw e;
+ }
+ }
+ }
+
+ /** Class to close a folder.
+ */
+ protected static class CloseFolderThread extends Thread
+ {
+ protected final EmailSession session;
+ protected final Folder folder;
+
+ // Local folder
+ protected Throwable exception = null;
+
+ public CloseFolderThread(EmailSession session, Folder folder)
+ {
+ this.session = session;
+ this.folder = folder;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ session.closeFolder(folder);
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+
+ public void finishUp()
+ throws MessagingException, InterruptedException
+ {
+ try
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof MessagingException)
+ throw (MessagingException)exception;
+ else
+ throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ } catch (InterruptedException e) {
+ this.interrupt();
+ throw e;
+ }
+ }
+ }
+
+ /** Class to get all messages from a folder.
+ */
+ protected static class GetMessagesThread extends Thread
+ {
+ protected final EmailSession session;
+ protected final Folder folder;
+
+ // Local messages
+ protected Message[] messages = null;
+ protected Throwable exception = null;
+
+ public GetMessagesThread(EmailSession session, Folder folder)
+ {
+ this.session = session;
+ this.folder = folder;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ messages = session.getMessages(folder);
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+
+ public Message[] finishUp()
+ throws MessagingException, InterruptedException
+ {
+ try
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof MessagingException)
+ throw (MessagingException)exception;
+ else
+ throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ return messages;
+ } catch (InterruptedException e) {
+ this.interrupt();
+ throw e;
+ }
+ }
+ }
+
+ /** Class to search for messages in a folder.
+ */
+ protected static class SearchMessagesThread extends Thread
+ {
+ protected final EmailSession session;
+ protected final Folder folder;
+ protected final SearchTerm searchTerm;
+
+ // Local messages
+ protected Message[] messages = null;
+ protected Throwable exception = null;
+
+ public SearchMessagesThread(EmailSession session, Folder folder, SearchTerm searchTerm)
+ {
+ this.session = session;
+ this.folder = folder;
+ this.searchTerm = searchTerm;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ messages = session.search(folder, searchTerm);
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+
+ public Message[] finishUp()
+ throws MessagingException, InterruptedException
+ {
+ try
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof MessagingException)
+ throw (MessagingException)exception;
+ else
+ throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ return messages;
+ } catch (InterruptedException e) {
+ this.interrupt();
+ throw e;
+ }
+ }
+ }
+
}
\ No newline at end of file