You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/20 19:56:03 UTC

svn commit: r1552782 - /manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java

Author: kwright
Date: Fri Dec 20 18:56:02 2013
New Revision: 1552782

URL: http://svn.apache.org/r1552782
Log:
Add background threads everywhere needed

Modified:
    manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java

Modified: manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java?rev=1552782&r1=1552781&r2=1552782&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-553/connectors/email/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/email/EmailConnector.java Fri Dec 20 18:56:02 2013
@@ -78,7 +78,7 @@ public class EmailConnector extends org.
   protected String username = null;
   protected String password = null;
   protected String protocol = null;
-  protected Properties properties = new Properties();
+  protected Properties properties = null;
   
   // Local session handle
   protected EmailSession session = null;
@@ -108,6 +108,7 @@ public class EmailConnector extends org.
     this.protocol = configParameters.getParameter(EmailConfig.PROTOCOL_PARAM);
     this.username = configParameters.getParameter(EmailConfig.USERNAME_PARAM);
     this.password = configParameters.getParameter(EmailConfig.PASSWORD_PARAM);
+    this.properties = new Properties();
     int i = 0;
     while (i < configParameters.getChildCount()) //In post property set is added as a configuration node
     {
@@ -173,7 +174,11 @@ public class EmailConnector extends org.
     finalizeConnection();
     getSession();
     try {
-      session.checkConnection();
+      CheckConnectionThread cct = new CheckConnectionThread(session);
+      cct.start();
+      cct.finishUp();
+    } catch (InterruptedException e) {
+      throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
     } catch (MessagingException e) {
       handleMessagingException(e,"checking the connection");
     }
@@ -280,7 +285,9 @@ public class EmailConnector extends org.
     for (String folderName : folderNames)
     {
       try {
-        Folder folder = session.openFolder(folderName);
+        OpenFolderThread oft = new OpenFolderThread(session, folderName);
+        oft.start();
+        Folder folder = oft.finishUp();
         try
         {
           Message[] messages = findMessages(folder, startTime, endTime, findMap);
@@ -291,8 +298,12 @@ public class EmailConnector extends org.
         }
         finally
         {
-          session.closeFolder(folder);
+          CloseFolderThread cft = new CloseFolderThread(session, folder);
+          cft.start();
+          cft.finishUp();
         }
+      } catch (InterruptedException e) {
+        throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
       } catch (MessagingException e) {
         handleMessagingException(e, "finding emails");
       }
@@ -303,7 +314,8 @@ public class EmailConnector extends org.
   /*
   This method will return the list of messages which matches the given criteria
   */
-  private Message[] findMessages(Folder folder, long startTime, long endTime, Map<String,String> findMap) throws MessagingException {
+  private Message[] findMessages(Folder folder, long startTime, long endTime, Map<String,String> findMap)
+    throws MessagingException, InterruptedException {
     String findParameterName;
     String findParameterValue;
     
@@ -343,9 +355,17 @@ public class EmailConnector extends org.
     
     Message[] result;
     if (searchTerm == null)
-      result = session.getMessages(folder);
+    {
+      GetMessagesThread gmt = new GetMessagesThread(session, folder);
+      gmt.start();
+      result = gmt.finishUp();
+    }
     else
-      result = session.search(folder, searchTerm);
+    {
+      SearchMessagesThread smt = new SearchMessagesThread(session, folder, searchTerm);
+      smt.start();
+      result = smt.finishUp();
+    }
     return result;
   }
 
@@ -368,8 +388,12 @@ public class EmailConnector extends org.
         port = -1;
 
       try {
-        session = new EmailSession(server, port, username, password,
+        ConnectThread connectThread = new ConnectThread(server, port, username, password,
           providerMap.get(protocol), properties);
+        connectThread.start();
+        session = connectThread.finishUp();
+      } catch (InterruptedException e) {
+        throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
       } catch (MessagingException e) {
         handleMessagingException(e, "connecting");
       }
@@ -380,7 +404,10 @@ public class EmailConnector extends org.
   protected void finalizeConnection() {
     if (session != null) {
       try {
-        session.close();
+        CloseSessionThread closeSessionThread = new CloseSessionThread(session);
+        closeSessionThread.start();
+        closeSessionThread.finishUp();
+      } catch (InterruptedException e) {
       } catch (MessagingException e) {
         Logging.connectors.warn("Error while closing connection to server: " + e.getMessage(),e);
       } finally {
@@ -476,7 +503,9 @@ public class EmailConnector extends org.
           Folder folder = openFolders.get(folderName);
           if (folder == null)
           {
-            folder = session.openFolder(folderName);
+            OpenFolderThread oft = new OpenFolderThread(session, folderName);
+            oft.start();
+            folder = oft.finishUp();
             openFolders.put(folderName,folder);
           }
           
@@ -485,10 +514,11 @@ public class EmailConnector extends org.
           if (Logging.connectors.isDebugEnabled())
             Logging.connectors.debug("Email: Processing document identifier '"
               + compositeID + "'");
-          MessageIDTerm messageIDTerm = new MessageIDTerm(id);
-          Message[] message = null;
-
-          message = session.search(folder, messageIDTerm);
+          SearchTerm messageIDTerm = new MessageIDTerm(id);
+          
+          SearchMessagesThread smt = new SearchMessagesThread(session, folder, messageIDTerm);
+          smt.start();
+          Message[] message = smt.finishUp();
 
           for (Message msg : message) {
             RepositoryDocument rd = new RepositoryDocument();
@@ -571,6 +601,8 @@ public class EmailConnector extends org.
             activities.ingestDocument(id, version, documentURI, rd);
 
           }
+        } catch (InterruptedException e) {
+          throw new ManifoldCFException(e.getMessage(), ManifoldCFException.INTERRUPTED);
         } catch (MessagingException e) {
           handleMessagingException(e, "processing email");
         } catch (IOException e) {
@@ -587,7 +619,13 @@ public class EmailConnector extends org.
       {
         try
         {
-          session.closeFolder(f);
+          CloseFolderThread cft = new CloseFolderThread(session, f);
+          cft.start();
+          cft.finishUp();
+        }
+        catch (InterruptedException e)
+        {
+          throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
         }
         catch (MessagingException e)
         {
@@ -1035,7 +1073,13 @@ public class EmailConnector extends org.
     getSession();
     try
     {
-      return session.listFolders();
+      ListFoldersThread lft = new ListFoldersThread(session);
+      lft.start();
+      return lft.finishUp();
+    }
+    catch (InterruptedException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
     }
     catch (MessagingException e)
     {
@@ -1197,4 +1241,439 @@ public class EmailConnector extends org.
     }
   }
 
+  /** Class to set up connection.
+  */
+  protected static class ConnectThread extends Thread
+  {
+    protected final String server;
+    protected final int port;
+    protected final String username;
+    protected final String password;
+    protected final String protocol;
+    protected final Properties properties;
+    
+    // Local session handle
+    protected EmailSession session = null;
+    protected Throwable exception = null;
+    
+    public ConnectThread(String server, int port, String username, String password, String protocol, Properties properties)
+    {
+      this.server = server;
+      this.port = port;
+      this.username = username;
+      this.password = password;
+      this.protocol = protocol;
+      this.properties = properties;
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        session = new EmailSession(server, port, username, password, protocol, properties);
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+    
+    public EmailSession finishUp()
+      throws MessagingException, InterruptedException
+    {
+      try
+      {
+        join();
+        if (exception != null)
+        {
+          if (exception instanceof RuntimeException)
+            throw (RuntimeException)exception;
+          else if (exception instanceof Error)
+            throw (Error)exception;
+          else if (exception instanceof MessagingException)
+            throw (MessagingException)exception;
+          else
+            throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+        }
+        return session;
+      } catch (InterruptedException e) {
+        this.interrupt();
+        throw e;
+      }
+    }
+  }
+
+  /** Class to close the session.
+  */
+  protected static class CloseSessionThread extends Thread
+  {
+    protected final EmailSession session;
+    
+    protected Throwable exception = null;
+    
+    public CloseSessionThread(EmailSession session)
+    {
+      this.session = session;
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        session.close();
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+    
+    public void finishUp()
+      throws MessagingException, InterruptedException
+    {
+      try
+      {
+        join();
+        if (exception != null)
+        {
+          if (exception instanceof RuntimeException)
+            throw (RuntimeException)exception;
+          else if (exception instanceof Error)
+            throw (Error)exception;
+          else if (exception instanceof MessagingException)
+            throw (MessagingException)exception;
+          else
+            throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+        }
+      } catch (InterruptedException e) {
+        this.interrupt();
+        throw e;
+      }
+    }
+  }
+
+  /** Class to list all folders.
+  */
+  protected static class ListFoldersThread extends Thread
+  {
+    protected final EmailSession session;
+    
+    protected String[] rval = null;
+    protected Throwable exception = null;
+    
+    public ListFoldersThread(EmailSession session)
+    {
+      this.session = session;
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        rval = session.listFolders();
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+    
+    public String[] finishUp()
+      throws MessagingException, InterruptedException
+    {
+      try
+      {
+        join();
+        if (exception != null)
+        {
+          if (exception instanceof RuntimeException)
+            throw (RuntimeException)exception;
+          else if (exception instanceof Error)
+            throw (Error)exception;
+          else if (exception instanceof MessagingException)
+            throw (MessagingException)exception;
+          else
+            throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+        }
+        return rval;
+      } catch (InterruptedException e) {
+        this.interrupt();
+        throw e;
+      }
+    }
+  }
+
+  /** Class to check the connection.
+  */
+  protected static class CheckConnectionThread extends Thread
+  {
+    protected final EmailSession session;
+    
+    protected Throwable exception = null;
+    
+    public CheckConnectionThread(EmailSession session)
+    {
+      this.session = session;
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        session.checkConnection();
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+    
+    public void finishUp()
+      throws MessagingException, InterruptedException
+    {
+      try
+      {
+        join();
+        if (exception != null)
+        {
+          if (exception instanceof RuntimeException)
+            throw (RuntimeException)exception;
+          else if (exception instanceof Error)
+            throw (Error)exception;
+          else if (exception instanceof MessagingException)
+            throw (MessagingException)exception;
+          else
+            throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+        }
+      } catch (InterruptedException e) {
+        this.interrupt();
+        throw e;
+      }
+    }
+  }
+
+  /** Class to open a folder.
+  */
+  protected static class OpenFolderThread extends Thread
+  {
+    protected final EmailSession session;
+    protected final String folderName;
+    
+    // Local folder
+    protected Folder folder = null;
+    protected Throwable exception = null;
+    
+    public OpenFolderThread(EmailSession session, String folderName)
+    {
+      this.session = session;
+      this.folderName = folderName;
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        folder = session.openFolder(folderName);
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+    
+    public Folder finishUp()
+      throws MessagingException, InterruptedException
+    {
+      try
+      {
+        join();
+        if (exception != null)
+        {
+          if (exception instanceof RuntimeException)
+            throw (RuntimeException)exception;
+          else if (exception instanceof Error)
+            throw (Error)exception;
+          else if (exception instanceof MessagingException)
+            throw (MessagingException)exception;
+          else
+            throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+        }
+        return folder;
+      } catch (InterruptedException e) {
+        this.interrupt();
+        throw e;
+      }
+    }
+  }
+  
+  /** Class to close a folder.
+  */
+  protected static class CloseFolderThread extends Thread
+  {
+    protected final EmailSession session;
+    protected final Folder folder;
+    
+    // Local folder
+    protected Throwable exception = null;
+    
+    public CloseFolderThread(EmailSession session, Folder folder)
+    {
+      this.session = session;
+      this.folder = folder;
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        session.closeFolder(folder);
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+    
+    public void finishUp()
+      throws MessagingException, InterruptedException
+    {
+      try
+      {
+        join();
+        if (exception != null)
+        {
+          if (exception instanceof RuntimeException)
+            throw (RuntimeException)exception;
+          else if (exception instanceof Error)
+            throw (Error)exception;
+          else if (exception instanceof MessagingException)
+            throw (MessagingException)exception;
+          else
+            throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+        }
+      } catch (InterruptedException e) {
+        this.interrupt();
+        throw e;
+      }
+    }
+  }
+
+  /** Class to get all messages from a folder.
+  */
+  protected static class GetMessagesThread extends Thread
+  {
+    protected final EmailSession session;
+    protected final Folder folder;
+    
+    // Local messages
+    protected Message[] messages = null;
+    protected Throwable exception = null;
+    
+    public GetMessagesThread(EmailSession session, Folder folder)
+    {
+      this.session = session;
+      this.folder = folder;
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        messages = session.getMessages(folder);
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+    
+    public Message[] finishUp()
+      throws MessagingException, InterruptedException
+    {
+      try
+      {
+        join();
+        if (exception != null)
+        {
+          if (exception instanceof RuntimeException)
+            throw (RuntimeException)exception;
+          else if (exception instanceof Error)
+            throw (Error)exception;
+          else if (exception instanceof MessagingException)
+            throw (MessagingException)exception;
+          else
+            throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+        }
+        return messages;
+      } catch (InterruptedException e) {
+        this.interrupt();
+        throw e;
+      }
+    }
+  }
+
+  /** Class to search for messages in a folder.
+  */
+  protected static class SearchMessagesThread extends Thread
+  {
+    protected final EmailSession session;
+    protected final Folder folder;
+    protected final SearchTerm searchTerm;
+    
+    // Local messages
+    protected Message[] messages = null;
+    protected Throwable exception = null;
+    
+    public SearchMessagesThread(EmailSession session, Folder folder, SearchTerm searchTerm)
+    {
+      this.session = session;
+      this.folder = folder;
+      this.searchTerm = searchTerm;
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        messages = session.search(folder, searchTerm);
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+    
+    public Message[] finishUp()
+      throws MessagingException, InterruptedException
+    {
+      try
+      {
+        join();
+        if (exception != null)
+        {
+          if (exception instanceof RuntimeException)
+            throw (RuntimeException)exception;
+          else if (exception instanceof Error)
+            throw (Error)exception;
+          else if (exception instanceof MessagingException)
+            throw (MessagingException)exception;
+          else
+            throw new RuntimeException("Unknown exception type: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+        }
+        return messages;
+      } catch (InterruptedException e) {
+        this.interrupt();
+        throw e;
+      }
+    }
+  }
+
 }
\ No newline at end of file