You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/23 23:33:50 UTC

svn commit: r1062556 - in /incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler: interfaces/IJobManager.java jobs/JobManager.java jobs/Jobs.java system/JobNotificationThread.java system/StartDeleteThread.java

Author: kwright
Date: Sun Jan 23 22:33:49 2011
New Revision: 1062556

URL: http://svn.apache.org/viewvc?rev=1062556&view=rev
Log:
Fix for CONNECTORS-152.  I created a new job state so that the job notification thread could change the state when the job was selected for notification.  This will allow multiple job notification threads if required in the future.

Modified:
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1062556&r1=1062555&r2=1062556&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Sun Jan 23 22:33:49 2011
@@ -168,6 +168,11 @@ public interface IJobManager
   public void resetDeleteStartupWorkerStatus()
     throws ManifoldCFException;
 
+  /** Reset as part of restoring notification threads.
+  */
+  public void resetNotificationWorkerStatus()
+    throws ManifoldCFException;
+
   /** Reset as part of restoring startup threads.
   */
   public void resetStartupWorkerStatus()
@@ -663,7 +668,7 @@ public interface IJobManager
   /** Find the list of jobs that need to have their connectors notified of job completion.
   *@return the ID's of jobs that need their output connectors notified in order to become inactive.
   */
-  public Long[] getJobsReadyForInactivity()
+  public JobStartRecord[] getJobsReadyForInactivity()
     throws ManifoldCFException;
 
   /** Inactivate a job, from the notification state.
@@ -679,6 +684,13 @@ public interface IJobManager
   public void resetStartDeleteJob(Long jobID)
     throws ManifoldCFException;
 
+  /** Reset a job that is notifying back to "ready for notify"
+  * state.
+  *@param jobID is the job id.
+  */
+  public void resetNotifyJob(Long jobID)
+    throws ManifoldCFException;
+
   /** Reset a starting job back to "ready for startup" state.
   *@param jobID is the job id.
   */

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1062556&r1=1062555&r2=1062556&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Sun Jan 23 22:33:49 2011
@@ -756,6 +756,16 @@ public class JobManager implements IJobM
     Logging.jobs.debug("Reset complete");
   }
 
+  /** Reset as part of restoring notification threads.
+  */
+  public void resetNotificationWorkerStatus()
+    throws ManifoldCFException
+  {
+    Logging.jobs.debug("Resetting notification up status");
+    jobs.resetNotificationWorkerStatus();
+    Logging.jobs.debug("Reset complete");
+  }
+
   /** Reset as part of restoring startup threads.
   */
   public void resetStartupWorkerStatus()
@@ -5468,6 +5478,68 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Reset a job that is notifying back to "ready for notify"
+  * state.
+  *@param jobID is the job id.
+  */
+  public void resetNotifyJob(Long jobID)
+    throws ManifoldCFException
+  {
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Check job status
+        ArrayList list = new ArrayList();
+        list.add(jobID);
+        IResultSet set = database.performQuery("SELECT "+jobs.statusField+" FROM "+jobs.getTableName()+
+          " WHERE "+jobs.idField+"=? FOR UPDATE",list,null,null);
+        if (set.getRowCount() == 0)
+          throw new ManifoldCFException("No such job: "+jobID);
+        IResultRow row = set.getRow(0);
+        int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+
+        switch (status)
+        {
+        case Jobs.STATUS_NOTIFYINGOFCOMPLETION:
+          if (Logging.jobs.isDebugEnabled())
+            Logging.jobs.debug("Setting job "+jobID+" back to 'ReadyForNotify' state");
+
+          // Set the state of the job back to "ReadyForStartup"
+          jobs.writeStatus(jobID,jobs.STATUS_READYFORDELETE);
+          break;
+        default:
+          throw new ManifoldCFException("Unexpected job status: "+Integer.toString(status));
+        }
+        return;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted resetting notify job: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Reset a starting job back to "ready for startup" state.
   *@param jobID is the job id.
   */
@@ -5875,28 +5947,60 @@ public class JobManager implements IJobM
   /** Find the list of jobs that need to have their connectors notified of job completion.
   *@return the ID's of jobs that need their output connectors notified in order to become inactive.
   */
-  public Long[] getJobsReadyForInactivity()
+  public JobStartRecord[] getJobsReadyForInactivity()
     throws ManifoldCFException
   {
-    // Do the query
-    ArrayList list = new ArrayList();
-    list.add(jobs.statusToString(jobs.STATUS_NOTIFYINGOFCOMPLETION));
-    IResultSet set = database.performQuery("SELECT "+jobs.idField+" FROM "+
-      jobs.getTableName()+" WHERE "+jobs.statusField+"=?",list,null,null);
-    // Return them all
-    Long[] rval = new Long[set.getRowCount()];
-    int i = 0;
-    while (i < rval.length)
+    while (true)
     {
-      IResultRow row = set.getRow(i);
-      Long jobID = (Long)row.getValue(jobs.idField);
-      rval[i++] = jobID;
-      if (Logging.jobs.isDebugEnabled())
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
       {
-        Logging.jobs.debug("Found job "+jobID+" in need of notification");
+        // Do the query
+        ArrayList list = new ArrayList();
+        list.add(jobs.statusToString(jobs.STATUS_READYFORNOTIFY));
+        IResultSet set = database.performQuery("SELECT "+jobs.idField+" FROM "+
+          jobs.getTableName()+" WHERE "+jobs.statusField+"=?",list,null,null);
+        // Return them all
+        JobStartRecord[] rval = new JobStartRecord[set.getRowCount()];
+        int i = 0;
+        while (i < rval.length)
+        {
+          IResultRow row = set.getRow(i);
+          Long jobID = (Long)row.getValue(jobs.idField);
+          // Mark status of job as "starting delete"
+          jobs.writeStatus(jobID,jobs.STATUS_NOTIFYINGOFCOMPLETION);
+          if (Logging.jobs.isDebugEnabled())
+          {
+            Logging.jobs.debug("Found job "+jobID+" in need of notification");
+          }
+          rval[i++] = new JobStartRecord(jobID,0L);
+        }
+        return rval;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted getting jobs ready for notify: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
       }
     }
-    return rval;
   }
   
   /** Complete the sequence that aborts jobs and makes them runnable again.
@@ -6321,6 +6425,7 @@ public class JobManager implements IJobM
       case Jobs.STATUS_SHUTTINGDOWN:
         rstatus = JobStatus.JOBSTATUS_JOBENDCLEANUP;
         break;
+      case Jobs.STATUS_READYFORNOTIFY:
       case Jobs.STATUS_NOTIFYINGOFCOMPLETION:
         rstatus = JobStatus.JOBSTATUS_JOBENDNOTIFICATION;
         break;

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1062556&r1=1062555&r2=1062556&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Sun Jan 23 22:33:49 2011
@@ -50,9 +50,10 @@ public class Jobs extends org.apache.man
   public static final int STATUS_ABORTINGFORRESTART = 16;       // Same as aborting, except after abort is complete startup will happen.
   public static final int STATUS_ABORTINGFORRESTARTSEEDING = 17;  // Seeding version of aborting for restart
   public static final int STATUS_ABORTINGSTARTINGUPFORRESTART = 18; // Starting up version of aborting for restart
-  public static final int STATUS_NOTIFYINGOFCOMPLETION = 19;    // Notifying connector of terminating job (either aborted, or finished)
-  public static final int STATUS_DELETING = 20;                         // The job is deleting.
-  public static final int STATUS_DELETESTARTINGUP = 21;         // The delete is starting up.
+  public static final int STATUS_READYFORNOTIFY = 19;                   // Job is ready to be notified of completion
+  public static final int STATUS_NOTIFYINGOFCOMPLETION = 20;    // Notifying connector of terminating job (either aborted, or finished)
+  public static final int STATUS_DELETING = 21;                         // The job is deleting.
+  public static final int STATUS_DELETESTARTINGUP = 22;         // The delete is starting up.
   
   // These statuses have to do with whether a job has an installed underlying connector or not.
   // There are two reasons to have a special state here: (1) if the behavior of the crawler differs, or (2) if the
@@ -63,13 +64,13 @@ public class Jobs extends org.apache.man
   // But, since there is no indication in the jobs table of an uninstalled connector for such jobs, the code which starts
   // jobs up (or otherwise would enter any state that has a corresponding special state) must check to see if the underlying
   // connector exists before deciding what state to put the job into.
-  public static final int STATUS_ACTIVE_UNINSTALLED = 22;               // Active, but repository connector not installed
-  public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 23;   // Active and seeding, but repository connector not installed
-  public static final int STATUS_ACTIVE_NOOUTPUT = 24;                  // Active, but output connector not installed
-  public static final int STATUS_ACTIVESEEDING_NOOUTPUT = 25;       // Active and seeding, but output connector not installed
-  public static final int STATUS_ACTIVE_NEITHER = 26;                     // Active, but neither repository connector nor output connector installed
-  public static final int STATUS_ACTIVESEEDING_NEITHER = 27;          // Active and seeding, but neither repository connector nor output connector installed
-  public static final int STATUS_DELETING_NOOUTPUT = 28;                // Job is being deleted but there's no output connector installed
+  public static final int STATUS_ACTIVE_UNINSTALLED = 23;               // Active, but repository connector not installed
+  public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 24;   // Active and seeding, but repository connector not installed
+  public static final int STATUS_ACTIVE_NOOUTPUT = 25;                  // Active, but output connector not installed
+  public static final int STATUS_ACTIVESEEDING_NOOUTPUT = 26;       // Active and seeding, but output connector not installed
+  public static final int STATUS_ACTIVE_NEITHER = 27;                     // Active, but neither repository connector nor output connector installed
+  public static final int STATUS_ACTIVESEEDING_NEITHER = 28;          // Active and seeding, but neither repository connector nor output connector installed
+  public static final int STATUS_DELETING_NOOUTPUT = 29;                // Job is being deleted but there's no output connector installed
 
   // Type field values
   public static final int TYPE_CONTINUOUS = IJobDescription.TYPE_CONTINUOUS;
@@ -138,7 +139,8 @@ public class Jobs extends org.apache.man
     statusMap.put("A",new Integer(STATUS_ACTIVE));
     statusMap.put("P",new Integer(STATUS_PAUSED));
     statusMap.put("S",new Integer(STATUS_SHUTTINGDOWN));
-    statusMap.put("s",new Integer(STATUS_NOTIFYINGOFCOMPLETION));
+    statusMap.put("s",new Integer(STATUS_READYFORNOTIFY));
+    statusMap.put("n",new Integer(STATUS_NOTIFYINGOFCOMPLETION));
     statusMap.put("W",new Integer(STATUS_ACTIVEWAIT));
     statusMap.put("Z",new Integer(STATUS_PAUSEDWAIT));
     statusMap.put("X",new Integer(STATUS_ABORTING));
@@ -723,6 +725,12 @@ public class Jobs extends org.apache.man
       map.put(statusField,statusToString(STATUS_READYFORDELETE));
       performUpdate(map,"WHERE "+statusField+"=?",list,invKey);
 
+      // Notifying of completion goes back to just being ready for notify
+      list.clear();
+      list.add(statusToString(STATUS_NOTIFYINGOFCOMPLETION));
+      map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
+      performUpdate(map,"WHERE "+statusField+"=?",list,invKey);
+
       // Starting up or aborting starting up goes back to just being ready
       list.clear();
       list.add(statusToString(STATUS_STARTINGUP));
@@ -1031,6 +1039,21 @@ public class Jobs extends org.apache.man
 
   }
   
+  /** Reset notification worker thread status.
+  */
+  public void resetNotificationWorkerStatus()
+    throws ManifoldCFException
+  {
+    // This resets everything that the job notification thread would resolve.
+
+    ArrayList list = new ArrayList();
+    list.add(statusToString(STATUS_NOTIFYINGOFCOMPLETION));
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
+    performUpdate(map,"WHERE "+statusField+"=?",list,new StringSet(getJobStatusKey()));
+
+  }
+  
   /** Reset startup worker thread status.
   */
   public void resetStartupWorkerStatus()
@@ -1786,7 +1809,7 @@ public class Jobs extends org.apache.man
     ArrayList list = new ArrayList();
     list.add(jobID);
     HashMap map = new HashMap();
-    map.put(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION));
+    map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
     map.put(errorField,null);
     map.put(endTimeField,new Long(finishTime));
     map.put(lastTimeField,new Long(finishTime));
@@ -1805,7 +1828,7 @@ public class Jobs extends org.apache.man
     ArrayList list = new ArrayList();
     list.add(jobID);
     HashMap map = new HashMap();
-    map.put(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION));
+    map.put(statusField,statusToString(STATUS_READYFORNOTIFY));
     map.put(endTimeField,null);
     map.put(lastTimeField,new Long(abortTime));
     map.put(windowEndField,null);
@@ -1990,6 +2013,8 @@ public class Jobs extends org.apache.man
     case STATUS_SHUTTINGDOWN:
       return "S";
     case STATUS_NOTIFYINGOFCOMPLETION:
+      return "n";
+    case STATUS_READYFORNOTIFY:
       return "s";
     case STATUS_ACTIVEWAIT:
       return "W";

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java?rev=1062556&r1=1062555&r2=1062556&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java Sun Jan 23 22:33:49 2011
@@ -32,6 +32,9 @@ public class JobNotificationThread exten
 {
   public static final String _rcsid = "@(#)$Id: JobNotificationThread.java 998081 2010-09-17 11:33:15Z kwright $";
 
+  /** Notification reset manager */
+  protected static NotificationResetManager resetManager = new NotificationResetManager();
+
   /** Constructor.
   */
   public JobNotificationThread()
@@ -44,6 +47,8 @@ public class JobNotificationThread exten
 
   public void run()
   {
+    resetManager.registerMe();
+
     try
     {
       // Create a thread context object.
@@ -58,98 +63,130 @@ public class JobNotificationThread exten
         // Do another try/catch around everything in the loop
         try
         {
-          Long[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity();
-          
-          HashMap connectionNames = new HashMap();
-          
-          int k = 0;
-          while (k < jobsNeedingNotification.length)
+          // Before we begin, conditionally reset
+          resetManager.waitForReset(threadContext);
+
+          JobStartRecord[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity();
+          try
           {
-            Long jobID = jobsNeedingNotification[k++];
-            IJobDescription job = jobManager.load(jobID,true);
-            if (job != null)
+            HashMap connectionNames = new HashMap();
+            
+            int k = 0;
+            while (k < jobsNeedingNotification.length)
             {
-              // Get the connection name
-              String repositoryConnectionName = job.getConnectionName();
-              String outputConnectionName = job.getOutputConnectionName();
-              OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
-              connectionNames.put(c,c);
+              JobStartRecord jsr = jobsNeedingNotification[k++];
+              Long jobID = jsr.getJobID();
+              IJobDescription job = jobManager.load(jobID,true);
+              if (job != null)
+              {
+                // Get the connection name
+                String repositoryConnectionName = job.getConnectionName();
+                String outputConnectionName = job.getOutputConnectionName();
+                OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
+                connectionNames.put(c,c);
+              }
             }
-          }
-          
-          // Attempt to notify the specified connections
-          HashMap notifiedConnections = new HashMap();
-          
-          Iterator iter = connectionNames.keySet().iterator();
-          while (iter.hasNext())
-          {
-            OutputAndRepositoryConnection connections = (OutputAndRepositoryConnection)iter.next();
             
-            String outputConnectionName = connections.getOutputConnectionName();
-            String repositoryConnectionName = connections.getRepositoryConnectionName();
+            // Attempt to notify the specified connections
+            HashMap notifiedConnections = new HashMap();
             
-            OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
-            
-            IOutputConnection connection = connectionManager.load(outputConnectionName);
-            if (connection != null)
+            Iterator iter = connectionNames.keySet().iterator();
+            while (iter.hasNext())
             {
-              // Grab an appropriate connection instance
-              IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
-              if (connector != null)
+              OutputAndRepositoryConnection connections = (OutputAndRepositoryConnection)iter.next();
+              
+              String outputConnectionName = connections.getOutputConnectionName();
+              String repositoryConnectionName = connections.getRepositoryConnectionName();
+              
+              OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
+              
+              IOutputConnection connection = connectionManager.load(outputConnectionName);
+              if (connection != null)
               {
-                try
+                // Grab an appropriate connection instance
+                IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+                if (connector != null)
                 {
-                  // Do the notification itself
                   try
                   {
-                    connector.noteJobComplete(activity);
-                    notifiedConnections.put(connections,connections);
-                  }
-                  catch (ServiceInterruption e)
-                  {
-                    Logging.threads.warn("Service interruption notifying connection - retrying: "+e.getMessage(),e);
-                    continue;
+                    // Do the notification itself
+                    try
+                    {
+                      connector.noteJobComplete(activity);
+                      notifiedConnections.put(connections,connections);
+                    }
+                    catch (ServiceInterruption e)
+                    {
+                      Logging.threads.warn("Service interruption notifying connection - retrying: "+e.getMessage(),e);
+                      continue;
+                    }
+                    catch (ManifoldCFException e)
+                    {
+                      if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+                        throw e;
+                      if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+                        throw e;
+                      if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+                        throw e;
+                      // Nothing special; report the error and keep going.
+                      Logging.threads.error(e.getMessage(),e);
+                      continue;
+                    }
                   }
-                  catch (ManifoldCFException e)
+                  finally
                   {
-                    if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
-                      throw e;
-                    if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
-                      throw e;
-                    if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
-                      throw e;
-                    // Nothing special; report the error and keep going.
-                    Logging.threads.error(e.getMessage(),e);
-                    continue;
+                    OutputConnectorFactory.release(connector);
                   }
                 }
-                finally
+              }
+            }
+            
+            // Go through jobs again, and put the notified ones into the inactive state.
+            k = 0;
+            while (k < jobsNeedingNotification.length)
+            {
+              JobStartRecord jsr = jobsNeedingNotification[k++];
+              Long jobID = jsr.getJobID();
+              IJobDescription job = jobManager.load(jobID,true);
+              if (job != null)
+              {
+                // Get the connection name
+                String outputConnectionName = job.getOutputConnectionName();
+                String repositoryConnectionName = job.getConnectionName();
+                OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
+                
+                if (notifiedConnections.get(c) != null)
                 {
-                  OutputConnectorFactory.release(connector);
+                  // When done, put the job into the Inactive state.  Otherwise, the notification will be retried until it succeeds.
+                  jobManager.inactivateJob(jobID);
+                  jsr.noteStarted();
                 }
               }
             }
           }
-          
-          // Go through jobs again, and put the notified ones into the inactive state.
-          k = 0;
-          while (k < jobsNeedingNotification.length)
+          finally
           {
-            Long jobID = jobsNeedingNotification[k++];
-            IJobDescription job = jobManager.load(jobID,true);
-            if (job != null)
+            // Clean up all jobs that did not start
+            ManifoldCFException exception = null;
+            int i = 0;
+            while (i < jobsNeedingNotification.length)
             {
-              // Get the connection name
-              String outputConnectionName = job.getOutputConnectionName();
-              String repositoryConnectionName = job.getConnectionName();
-              OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
-              
-              if (notifiedConnections.get(c) != null)
+              JobStartRecord jsr = jobsNeedingNotification[i++];
+              if (!jsr.wasStarted())
               {
-                // When done, put the job into the Inactive state.  Otherwise, the notification will be retried until it succeeds.
-                jobManager.inactivateJob(jobID);
+                // Clean up from failed start.
+                try
+                {
+                  jobManager.resetNotifyJob(jsr.getJobID());
+                }
+                catch (ManifoldCFException e)
+                {
+                  exception = e;
+                }
               }
             }
+            if (exception != null)
+              throw exception;
           }
 
           ManifoldCF.sleep(10000L);
@@ -161,6 +198,8 @@ public class JobNotificationThread exten
 
           if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
           {
+            resetManager.noteEvent();
+            
             Logging.threads.error("Job notification thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
             try
             {
@@ -290,4 +329,27 @@ public class JobNotificationThread exten
     }
 
   }
+  
+  /** Class which handles reset for seeding thread pool (of which there's
+  * typically only one member).  The reset action here
+  * is to move the status of jobs back from "seeding" to normal.
+  */
+  protected static class NotificationResetManager extends ResetManager
+  {
+
+    /** Constructor. */
+    public NotificationResetManager()
+    {
+      super();
+    }
+
+    /** Reset */
+    protected void performResetLogic(IThreadContext tc)
+      throws ManifoldCFException
+    {
+      IJobManager jobManager = JobManagerFactory.make(tc);
+      jobManager.resetNotificationWorkerStatus();
+    }
+  }
+
 }

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java?rev=1062556&r1=1062555&r2=1062556&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java Sun Jan 23 22:33:49 2011
@@ -32,12 +32,9 @@ public class StartDeleteThread extends T
 {
   public static final String _rcsid = "@(#)$Id$";
 
-  /** Worker thread pool reset manager */
+  /** Delete startup reset manager */
   protected static DeleteStartupResetManager resetManager = new DeleteStartupResetManager();
 
-  /** The number of documents that are added to the queue per transaction */
-  protected final static int MAX_COUNT = 100;
-
   /** Constructor.
   */
   public StartDeleteThread()