You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2010/09/01 11:48:37 UTC

svn commit: r991491 - /incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java

Author: kwright
Date: Wed Sep  1 09:48:37 2010
New Revision: 991491

URL: http://svn.apache.org/viewvc?rev=991491&view=rev
Log:
Change notification to handle the case of multiple jobs for the same connection finishing simultaneously.  Only one notification needs to be sent to the connection in that case.  Part of CONNECTORS-41.

Modified:
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java?rev=991491&r1=991490&r2=991491&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java Wed Sep  1 09:48:37 2010
@@ -59,6 +59,8 @@ public class JobNotificationThread exten
         {
           Long[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity();
           
+          HashMap connectionNames = new HashMap();
+          
           int k = 0;
           while (k < jobsNeedingNotification.length)
           {
@@ -68,48 +70,77 @@ public class JobNotificationThread exten
             {
               // Get the connection name
               String connectionName = job.getOutputConnectionName();
-              IOutputConnection connection = connectionManager.load(connectionName);
-              if (connection != null)
+              connectionNames.put(connectionName,connectionName);
+            }
+          }
+          
+          // Attempt to notify the specified connections
+          HashMap notifiedConnections = new HashMap();
+          
+          Iterator iter = connectionNames.keySet().iterator();
+          while (iter.hasNext())
+          {
+            String connectionName = (String)iter.next();
+            
+            IOutputConnection connection = connectionManager.load(connectionName);
+            if (connection != null)
+            {
+              // Grab an appropriate connection instance
+              IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+              if (connector != null)
               {
-                // Grab an appropriate connection instance
-                IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
-                if (connector != null)
+                try
                 {
+                  // Do the notification itself
                   try
                   {
-                    // Do the notification itself
-                    try
-                    {
-                      connector.noteJobComplete();
-                    }
-                    catch (ServiceInterruption e)
-                    {
-                      Logging.threads.warn("Service interruption notifying connection - retrying: "+e.getMessage(),e);
-                      continue;
-                    }
-                    catch (ACFException e)
-                    {
-                      if (e.getErrorCode() == ACFException.INTERRUPTED)
-                        throw e;
-                      if (e.getErrorCode() == ACFException.DATABASE_CONNECTION_ERROR)
-                        throw e;
-                      if (e.getErrorCode() == ACFException.SETUP_ERROR)
-                        throw e;
-                      // Nothing special; report the error and keep going.
-                      Logging.threads.error(e.getMessage(),e);
-                      continue;
-                    }
-                    // When done, put the job into the Inactive state.
-                    jobManager.inactivateJob(jobID);
+                    connector.noteJobComplete();
+                    notifiedConnections.put(connectionName,connectionName);
                   }
-                  finally
+                  catch (ServiceInterruption e)
                   {
-                    OutputConnectorFactory.release(connector);
+                    Logging.threads.warn("Service interruption notifying connection - retrying: "+e.getMessage(),e);
+                    continue;
                   }
+                  catch (ACFException e)
+                  {
+                    if (e.getErrorCode() == ACFException.INTERRUPTED)
+                      throw e;
+                    if (e.getErrorCode() == ACFException.DATABASE_CONNECTION_ERROR)
+                      throw e;
+                    if (e.getErrorCode() == ACFException.SETUP_ERROR)
+                      throw e;
+                    // Nothing special; report the error and keep going.
+                    Logging.threads.error(e.getMessage(),e);
+                    continue;
+                  }
+                }
+                finally
+                {
+                  OutputConnectorFactory.release(connector);
                 }
               }
             }
           }
+          
+          // Go through jobs again, and put the notified ones into the inactive state.
+          k = 0;
+          while (k < jobsNeedingNotification.length)
+          {
+            Long jobID = jobsNeedingNotification[k++];
+            IJobDescription job = jobManager.load(jobID,true);
+            if (job != null)
+            {
+              // Get the connection name
+              String connectionName = job.getOutputConnectionName();
+              if (notifiedConnections.get(connectionName) != null)
+              {
+                // When done, put the job into the Inactive state.  Otherwise, the notification will be retried until it succeeds.
+                jobManager.inactivateJob(jobID);
+              }
+            }
+          }
+
           ACF.sleep(10000L);
         }
         catch (ACFException e)