You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/20 18:56:19 UTC

svn commit: r1604213 - in /manifoldcf/trunk: ./ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ framework/pull-agent/src/main/java/org/apache/manifoldcf...

Author: kwright
Date: Fri Jun 20 16:56:19 2014
New Revision: 1604213

URL: http://svn.apache.org/r1604213
Log:
Fix for CONNECTORS-980.

Modified:
    manifoldcf/trunk/   (props changed)
    manifoldcf/trunk/CHANGES.txt
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java

Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
  Merged /manifoldcf/branches/CONNECTORS-980:r1604127-1604211

Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Fri Jun 20 16:56:19 2014
@@ -3,6 +3,9 @@ $Id$
 
 ======================= 1.7-dev =====================
 
+CONNECTORS-980: Output connector gets notified now when job is deleted.
+(Karl Wright)
+
 CONNECTORS-954: Revamp AmazonCloudSearch output connector completely.
 (1) Remove Tika and field mapping, since that would be done upstream in the
 pipeline.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Fri Jun 20 16:56:19 2014
@@ -548,7 +548,15 @@ public interface IJobManager
   */
   public void retryNotification(JobNotifyRecord jobNotifyRecord, long failTime, int failRetryCount)
     throws ManifoldCFException;
-  
+
+  /** Retry delete notification.
+  *@param jnr is the current job notification record.
+  *@param failTime is the new fail time (-1L if none).
+  *@param failCount is the new fail retry count (-1 if none).
+  */
+  public void retryDeleteNotification(JobNotifyRecord jnr, long failTime, int failCount)
+    throws ManifoldCFException;
+
   /** Add an initial set of documents to the queue.
   * This method is called during job startup, when the queue is being loaded.
   * A set of document references is passed to this method, which updates the status of the document
@@ -825,11 +833,11 @@ public interface IJobManager
   public void resetSeedJob(Long jobID)
     throws ManifoldCFException;
 
-  /** Get the list of jobs that are ready for deletion.
+  /** Get the list of jobs that are ready for delete cleanup.
   *@param processID is the current process ID.
   *@return jobs that were in the "readyfordelete" state.
   */
-  public JobDeleteRecord[] getJobsReadyForDelete(String processID)
+  public JobDeleteRecord[] getJobsReadyForDeleteCleanup(String processID)
     throws ManifoldCFException;
     
   /** Get the list of jobs that are ready for startup.
@@ -846,12 +854,25 @@ public interface IJobManager
   public JobNotifyRecord[] getJobsReadyForInactivity(String processID)
     throws ManifoldCFException;
 
+  /** Find the list of jobs that need to have their connectors notified of job deletion.
+  *@param processID is the process ID.
+  *@return the ID's of jobs that need their output connectors notified in order to be removed.
+  */
+  public JobNotifyRecord[] getJobsReadyForDelete(String processID)
+    throws ManifoldCFException;
+
   /** Inactivate a job, from the notification state.
   *@param jobID is the ID of the job to inactivate.
   */
   public void inactivateJob(Long jobID)
     throws ManifoldCFException;
 
+  /** Remove a job, from the notification state.
+  *@param jobID is the ID of the job to remove.
+  */
+  public void removeJob(Long jobID)
+    throws ManifoldCFException;
+
   /** Reset a job starting for delete back to "ready for delete"
   * state.
   *@param jobID is the job id.
@@ -866,6 +887,13 @@ public interface IJobManager
   public void resetNotifyJob(Long jobID)
     throws ManifoldCFException;
 
+  /** Reset a job that is delete notifying back to "ready for delete notify"
+  * state.
+  *@param jobID is the job id.
+  */
+  public void resetDeleteNotifyJob(Long jobID)
+    throws ManifoldCFException;
+
   /** Reset a starting job back to "ready for startup" state.
   *@param jobID is the job id.
   */

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Fri Jun 20 16:56:19 2014
@@ -4299,7 +4299,68 @@ public class JobManager implements IJobM
     }
 
   }
-  
+
+  /** Retry delete notification.
+  *@param jnr is the current job notification record.
+  *@param failTime is the new fail time (-1L if none).
+  *@param failCount is the new fail retry count (-1 if none).
+  */
+  @Override
+  public void retryDeleteNotification(JobNotifyRecord jnr, long failTime, int failCount)
+    throws ManifoldCFException
+  {
+    Long jobID = jnr.getJobID();
+    long oldFailTime = jnr.getFailTime();
+    if (oldFailTime == -1L)
+      oldFailTime = failTime;
+    failTime = oldFailTime;
+    int oldFailCount = jnr.getFailRetryCount();
+    if (oldFailCount == -1)
+      oldFailCount = failCount;
+    else
+    {
+      oldFailCount--;
+      if (failCount != -1 && oldFailCount > failCount)
+        oldFailCount = failCount;
+    }
+    failCount = oldFailCount;
+
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        jobs.retryDeleteNotification(jobID,failTime,failCount);
+        database.performCommit();
+        break;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction resetting job notification: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+
+  }
+
   // Add documents methods
   
   /** Add an initial set of documents to the queue.
@@ -6862,12 +6923,12 @@ public class JobManager implements IJobM
     }
   }
 
-  /** Get the list of jobs that are ready for deletion.
+  /** Get the list of jobs that are ready for delete cleanup.
   *@param processID is the current process ID.
   *@return jobs that were in the "readyfordelete" state.
   */
   @Override
-  public JobDeleteRecord[] getJobsReadyForDelete(String processID)
+  public JobDeleteRecord[] getJobsReadyForDeleteCleanup(String processID)
     throws ManifoldCFException
   {
     while (true)
@@ -7091,6 +7152,84 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Remove a job, from the notification state.
+  *@param jobID is the ID of the job to remove.
+  */
+  @Override
+  public void removeJob(Long jobID)
+    throws ManifoldCFException
+  {
+    // While there is no flow that can cause a job to be in the wrong state when this gets called, as a precaution
+    // it might be a good idea to put this in a transaction and have the state get checked first.
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Check job status
+        StringBuilder sb = new StringBuilder("SELECT ");
+        ArrayList list = new ArrayList();
+        
+        sb.append(jobs.statusField).append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobs.idField,jobID)}))
+          .append(" FOR UPDATE");
+            
+        IResultSet set = database.performQuery(sb.toString(),list,null,null);
+        if (set.getRowCount() == 0)
+          // Presume already removed!
+          return;
+        IResultRow row = set.getRow(0);
+        int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+
+        switch (status)
+        {
+        case Jobs.STATUS_NOTIFYINGOFDELETION:
+          ManifoldCF.noteConfigurationChange();
+          // Remove documents from job queue
+          jobQueue.deleteAllJobRecords(jobID);
+          // Remove carrydowns for the job
+          carryDown.deleteOwner(jobID);
+          // Nothing is in a critical section - so this should be OK.
+          hopCount.deleteOwner(jobID);
+          jobs.delete(jobID);
+          if (Logging.jobs.isDebugEnabled())
+          {
+            Logging.jobs.debug("Removed job "+jobID);
+          }
+          break;
+        default:
+          throw new ManifoldCFException("Unexpected job status: "+Integer.toString(status));
+        }
+        database.performCommit();
+        return;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted clearing delete notification state for job: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Reset a job starting for delete back to "ready for delete"
   * state.
   *@param jobID is the job id.
@@ -7229,6 +7368,75 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Reset a job that is delete notifying back to "ready for delete notify"
+  * state.
+  *@param jobID is the job id.
+  */
+  @Override
+  public void resetDeleteNotifyJob(Long jobID)
+    throws ManifoldCFException
+  {
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Check job status
+        StringBuilder sb = new StringBuilder("SELECT ");
+        ArrayList list = new ArrayList();
+        
+        sb.append(jobs.statusField).append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobs.idField,jobID)}))
+          .append(" FOR UPDATE");
+            
+        IResultSet set = database.performQuery(sb.toString(),list,null,null);
+        if (set.getRowCount() == 0)
+          throw new ManifoldCFException("No such job: "+jobID);
+        IResultRow row = set.getRow(0);
+        int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+
+        switch (status)
+        {
+        case Jobs.STATUS_NOTIFYINGOFDELETION:
+          if (Logging.jobs.isDebugEnabled())
+            Logging.jobs.debug("Setting job "+jobID+" back to 'ReadyForDeleteNotify' state");
+
+          // Set the state of the job back to "ReadyForNotify"
+          jobs.writePermanentStatus(jobID,jobs.STATUS_READYFORDELETENOTIFY,true);
+          break;
+        default:
+          throw new ManifoldCFException("Unexpected job status: "+Integer.toString(status));
+        }
+        database.performCommit();
+        return;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted resetting delete notify job: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Reset a starting job back to "ready for startup" state.
   *@param jobID is the job id.
   */
@@ -7508,18 +7716,12 @@ public class JobManager implements IJobM
           if (confirmSet.getRowCount() > 0)
             continue;
 
-          ManifoldCF.noteConfigurationChange();
-          // Remove documents from job queue
-          jobQueue.deleteAllJobRecords(jobID);
-          // Remove carrydowns for the job
-          carryDown.deleteOwner(jobID);
-          // Nothing is in a critical section - so this should be OK.
-          hopCount.deleteOwner(jobID);
-          jobs.delete(jobID);
+          jobs.finishJobCleanup(jobID);
           if (Logging.jobs.isDebugEnabled())
           {
-            Logging.jobs.debug("Removed job "+jobID);
+            Logging.jobs.debug("Job "+jobID+" cleanup is now completed");
           }
+
         }
         database.performCommit();
         return;
@@ -7732,7 +7934,88 @@ public class JobManager implements IJobM
       }
     }
   }
-  
+
+  /** Find the list of jobs that need to have their connectors notified of job deletion.
+  *@param processID is the process ID.
+  *@return the ID's of jobs that need their output connectors notified in order to be removed.
+  */
+  @Override
+  public JobNotifyRecord[] getJobsReadyForDelete(String processID)
+    throws ManifoldCFException
+  {
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Do the query
+        StringBuilder sb = new StringBuilder("SELECT ");
+        ArrayList list = new ArrayList();
+        
+        sb.append(jobs.idField).append(",").append(jobs.failTimeField).append(",").append(jobs.failCountField)
+          .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_READYFORDELETENOTIFY))}))
+          .append(" FOR UPDATE");
+            
+        IResultSet set = database.performQuery(sb.toString(),list,null,null);
+        // Return them all
+        JobNotifyRecord[] rval = new JobNotifyRecord[set.getRowCount()];
+        int i = 0;
+        while (i < rval.length)
+        {
+          IResultRow row = set.getRow(i);
+          Long jobID = (Long)row.getValue(jobs.idField);
+          Long failTimeLong = (Long)row.getValue(jobs.failTimeField);
+          Long failRetryCountLong = (Long)row.getValue(jobs.failCountField);
+          long failTime;
+          if (failTimeLong == null)
+            failTime = -1L;
+          else
+            failTime = failTimeLong.longValue();
+          int failRetryCount;
+          if (failRetryCountLong == null)
+            failRetryCount = -1;
+          else
+            failRetryCount = (int)failRetryCountLong.longValue();
+      
+          // Mark status of job as "starting delete"
+          jobs.writeTransientStatus(jobID,jobs.STATUS_NOTIFYINGOFDELETION,processID);
+          if (Logging.jobs.isDebugEnabled())
+          {
+            Logging.jobs.debug("Found job "+jobID+" in need of delete notification");
+          }
+          rval[i++] = new JobNotifyRecord(jobID,failTime,failRetryCount);
+        }
+        database.performCommit();
+        return rval;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted getting jobs ready for notify: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Complete the sequence that resumes jobs, either from a pause or from a scheduling window
   * wait.  The logic will restore the job to an active state (many possibilities depending on
   * connector status), and will record the jobs that have been so modified.
@@ -8256,6 +8539,8 @@ public class JobManager implements IJobM
         break;
       case Jobs.STATUS_READYFORNOTIFY:
       case Jobs.STATUS_NOTIFYINGOFCOMPLETION:
+      case Jobs.STATUS_READYFORDELETENOTIFY:
+      case Jobs.STATUS_NOTIFYINGOFDELETION:
         rstatus = JobStatus.JOBSTATUS_JOBENDNOTIFICATION;
         break;
       case Jobs.STATUS_ABORTING:

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Fri Jun 20 16:56:19 2014
@@ -112,6 +112,8 @@ public class Jobs extends org.apache.man
   public static final int STATUS_DELETING = 35;                         // The job is deleting.
   public static final int STATUS_DELETESTARTINGUP = 36;         // The delete is starting up.
   public static final int STATUS_ABORTINGSHUTTINGDOWN = 37;     // Aborting the cleanup phase.
+  public static final int STATUS_READYFORDELETENOTIFY = 38;     // Job is ready for delete notification
+  public static final int STATUS_NOTIFYINGOFDELETION = 39;      // Notifying connector of job deletion
   
   // These statuses have to do with whether a job has an installed underlying connector or not.
   // There are two reasons to have a special state here: (1) if the behavior of the crawler differs, or (2) if the
@@ -122,9 +124,9 @@ public class Jobs extends org.apache.man
   // But, since there is no indication in the jobs table of an uninstalled connector for such jobs, the code which starts
   // jobs up (or otherwise would enter any state that has a corresponding special state) must check to see if the underlying
   // connector exists before deciding what state to put the job into.
-  public static final int STATUS_ACTIVE_UNINSTALLED = 38;               // Active, but repository connector not installed
-  public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 39;   // Active and seeding, but repository connector not installed
-  public static final int STATUS_DELETING_NOOUTPUT = 40;                // Job is being deleted but there's no output connector installed
+  public static final int STATUS_ACTIVE_UNINSTALLED = 40;               // Active, but repository connector not installed
+  public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 41;   // Active and seeding, but repository connector not installed
+  public static final int STATUS_DELETING_NOOUTPUT = 42;                // Job is being deleted but there's no output connector installed
 
   // Deprecated states.  These states should never be used; they're defined only for upgrade purposes
   public static final int STATUS_ACTIVE_NOOUTPUT = 100;                  // Active, but output connector not installed
@@ -213,6 +215,8 @@ public class Jobs extends org.apache.man
     statusMap.put("S",new Integer(STATUS_SHUTTINGDOWN));
     statusMap.put("s",new Integer(STATUS_READYFORNOTIFY));
     statusMap.put("n",new Integer(STATUS_NOTIFYINGOFCOMPLETION));
+    statusMap.put("d",new Integer(STATUS_READYFORDELETENOTIFY));
+    statusMap.put("j",new Integer(STATUS_NOTIFYINGOFDELETION));
     statusMap.put("W",new Integer(STATUS_ACTIVEWAIT));
     statusMap.put("Z",new Integer(STATUS_PAUSEDWAIT));
     statusMap.put("X",new Integer(STATUS_ABORTING));
@@ -245,15 +249,18 @@ public class Jobs extends org.apache.man
     statusMap.put("I",new Integer(STATUS_RESUMING));
     statusMap.put("i",new Integer(STATUS_RESUMINGSEEDING));
 
+
     // These are the uninstalled states.  The values, I'm afraid, are pretty random.
     statusMap.put("R",new Integer(STATUS_ACTIVE_UNINSTALLED));
     statusMap.put("r",new Integer(STATUS_ACTIVESEEDING_UNINSTALLED));
+    statusMap.put("D",new Integer(STATUS_DELETING_NOOUTPUT));
+
+    // These are deprecated states; we may be able to reclaim them
     statusMap.put("O",new Integer(STATUS_ACTIVE_NOOUTPUT));
     statusMap.put("o",new Integer(STATUS_ACTIVESEEDING_NOOUTPUT));
     statusMap.put("U",new Integer(STATUS_ACTIVE_NEITHER));
     statusMap.put("u",new Integer(STATUS_ACTIVESEEDING_NEITHER));
-    statusMap.put("D",new Integer(STATUS_DELETING_NOOUTPUT));
-    
+
     typeMap = new HashMap<String,Integer>();
     typeMap.put("C",new Integer(TYPE_CONTINUOUS));
     typeMap.put("S",new Integer(TYPE_SPECIFIED));
@@ -289,6 +296,7 @@ public class Jobs extends org.apache.man
   * STATUS_PAUSEDWAIT
   * STATUS_PAUSED
   * STATUS_READYFORNOTIFY
+  * STATUS_READYFORDELETENOTIFY
   * STATUS_READYFORDELETE
   * STATUS_DELETING
   * STATUS_READYFORSTARTUP
@@ -310,6 +318,7 @@ public class Jobs extends org.apache.man
   * These are the process-transient states:
   * STATUS_DELETESTARTINGUP
   * STATUS_NOTIFYINGOFCOMPLETION
+  * STATUS_NOTIFYINGOFDELETION
   * STATUS_STARTINGUP
   * STATUS_STARTINGUPMINIMAL
   * STATUS_ABORTINGSTARTINGUPFORRESTART
@@ -1112,6 +1121,17 @@ public class Jobs extends org.apache.man
     map.put(failCountField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
 
+    // Notifying of deletion goes back to just being ready for delete notify
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFDELETION)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+    map.put(processIDField,null);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
     // Starting up or aborting starting up goes back to just being ready
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -1259,6 +1279,14 @@ public class Jobs extends org.apache.man
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
 
+    // Notifying of deletion goes back to just being ready for delete notify
+    list.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFDELETION))});
+    map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,invKey);
+
     // Starting up or aborting starting up goes back to just being ready
     list.clear();
     query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -1666,6 +1694,17 @@ public class Jobs extends org.apache.man
     map.put(failCountField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
 
+    list.clear();
+    map.clear();
+    query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFDELETION)),
+      new UnitaryClause(processIDField,processID)});
+    map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+    map.put(processIDField,null);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
+    performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+
   }
   
   /** Reset startup worker thread status.
@@ -1948,7 +1987,32 @@ public class Jobs extends org.apache.man
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }
-  
+
+  /** Retry delete notification.
+  *@param jobID is the job identifier.
+  *@param failTime is the fail time, -1 == none
+  *@param failCount is the fail count to use, -1 == none.
+  */
+  public void retryDeleteNotification(Long jobID, long failTime, int failCount)
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(idField,jobID)});
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+    if (failTime == -1L)
+      map.put(failTimeField,null);
+    else
+      map.put(failTimeField,new Long(failTime));
+    if (failCount == -1)
+      map.put(failCountField,null);
+    else
+      map.put(failCountField,failCount);
+    map.put(processIDField,null);
+    performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+  }
+
   /** Write job status and window end, and clear the endtime field.  (The start time will be written
   * when the job enters the "active" state.)
   *@param jobID is the job identifier.
@@ -2758,6 +2822,24 @@ public class Jobs extends org.apache.man
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }
 
+  /** Finish job cleanup.
+  * Write completion and the current time.
+  *@param jobID is the job id.
+  */
+  public void finishJobCleanup(Long jobID)
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(idField,jobID)});
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+    map.put(errorField,null);
+    // Anything else?
+    // MHL
+    performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+  }
+
   /** Resume a stopped job (from a pause or activewait).
   * Updates the job record in a manner consistent with the job's state.
   */
@@ -3113,6 +3195,10 @@ public class Jobs extends org.apache.man
       return "n";
     case STATUS_READYFORNOTIFY:
       return "s";
+    case STATUS_NOTIFYINGOFDELETION:
+      return "j";
+    case STATUS_READYFORDELETENOTIFY:
+      return "d";
     case STATUS_ACTIVEWAIT:
       return "W";
     case STATUS_PAUSEDWAIT:
@@ -3157,14 +3243,6 @@ public class Jobs extends org.apache.man
       return "R";
     case STATUS_ACTIVESEEDING_UNINSTALLED:
       return "r";
-    case STATUS_ACTIVE_NOOUTPUT:
-      return "O";
-    case STATUS_ACTIVESEEDING_NOOUTPUT:
-      return "o";
-    case STATUS_ACTIVE_NEITHER:
-      return "U";
-    case STATUS_ACTIVESEEDING_NEITHER:
-      return "u";
     case STATUS_DELETING_NOOUTPUT:
       return "D";
     
@@ -3188,6 +3266,16 @@ public class Jobs extends org.apache.man
     case STATUS_ABORTINGSHUTTINGDOWN:
       return "v";
 
+    // These are deprecated
+    case STATUS_ACTIVE_NOOUTPUT:
+      return "O";
+    case STATUS_ACTIVESEEDING_NOOUTPUT:
+      return "o";
+    case STATUS_ACTIVE_NEITHER:
+      return "U";
+    case STATUS_ACTIVESEEDING_NEITHER:
+      return "u";
+
     default:
       throw new ManifoldCFException("Bad status value: "+Integer.toString(status));
     }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java Fri Jun 20 16:56:19 2014
@@ -249,6 +249,184 @@ public class JobNotificationThread exten
               throw exception;
           }
 
+          // ???
+          JobNotifyRecord[] jobsNeedingDeleteNotification = jobManager.getJobsReadyForDelete(processID);
+          try
+          {
+            Set<OutputAndRepositoryConnection> connectionNames = new HashSet<OutputAndRepositoryConnection>();
+            
+            int k = 0;
+            while (k < jobsNeedingDeleteNotification.length)
+            {
+              JobNotifyRecord jsr = jobsNeedingDeleteNotification[k++];
+              Long jobID = jsr.getJobID();
+              IJobDescription job = jobManager.load(jobID,true);
+              if (job != null)
+              {
+                // Get the connection name
+                String repositoryConnectionName = job.getConnectionName();
+                IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
+                for (int i = 0; i < basicSpec.getOutputCount(); i++)
+                {
+                  String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
+                  OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
+                  connectionNames.add(c);
+                }
+              }
+            }
+            
+            // Attempt to notify the specified connections
+            Map<OutputAndRepositoryConnection,Disposition> notifiedConnections = new HashMap<OutputAndRepositoryConnection,Disposition>();
+            
+            for (OutputAndRepositoryConnection connections : connectionNames)
+            {
+              String outputConnectionName = connections.getOutputConnectionName();
+              String repositoryConnectionName = connections.getRepositoryConnectionName();
+              
+              OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
+              
+              IOutputConnection connection = connectionManager.load(outputConnectionName);
+              if (connection != null)
+              {
+                // Grab an appropriate connection instance
+                IOutputConnector connector = outputConnectorPool.grab(connection);
+                if (connector != null)
+                {
+                  try
+                  {
+                    // Do the notification itself
+                    try
+                    {
+                      connector.noteJobComplete(activity);
+                      notifiedConnections.put(connections,new Disposition());
+                    }
+                    catch (ServiceInterruption e)
+                    {
+                      notifiedConnections.put(connections,new Disposition(e));
+                    }
+                    catch (ManifoldCFException e)
+                    {
+                      if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+                        throw e;
+                      if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+                        throw e;
+                      if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+                        throw e;
+                      // Nothing special; report the error and keep going.
+                      Logging.threads.error(e.getMessage(),e);
+                    }
+                  }
+                  finally
+                  {
+                    outputConnectorPool.release(connection,connector);
+                  }
+                }
+              }
+            }
+            
+            // Go through jobs again, and put the notified ones into the inactive state.
+            k = 0;
+            while (k < jobsNeedingDeleteNotification.length)
+            {
+              JobNotifyRecord jsr = jobsNeedingDeleteNotification[k++];
+              Long jobID = jsr.getJobID();
+              IJobDescription job = jobManager.load(jobID,true);
+              if (job != null)
+              {
+                // Get the connection name
+                String repositoryConnectionName = job.getConnectionName();
+                IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
+                boolean allOK = true;
+                for (int i = 0; i < basicSpec.getOutputCount(); i++)
+                {
+                  String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
+
+                  OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
+                  
+                  Disposition d = notifiedConnections.get(c);
+                  if (d != null)
+                  {
+                    ServiceInterruption e = d.getServiceInterruption();
+                    if (e == null)
+                    {
+                      break;
+                    }
+                    else
+                    {
+                      if (!e.jobInactiveAbort())
+                      {
+                        Logging.jobs.warn("Delete notification service interruption reported for job "+
+                          jobID+" output connection '"+outputConnectionName+"': "+
+                          e.getMessage(),e);
+                      }
+
+                      // If either we are going to be requeuing beyond the fail time, OR
+                      // the number of retries available has hit 0, THEN we treat this
+                      // as either an "ignore" or a hard error.
+                      if (!e.jobInactiveAbort() && (jsr.getFailTime() != -1L && jsr.getFailTime() < e.getRetryTime() ||
+                        jsr.getFailRetryCount() == 0))
+                      {
+                        // Treat this as a hard failure.
+                        if (e.isAbortOnFail())
+                        {
+                          // Note the error in the job, and transition to inactive state
+                          String message = e.jobInactiveAbort()?"":"Repeated service interruptions during notification"+((e.getCause()!=null)?": "+e.getCause().getMessage():"");
+                          if (jobManager.errorAbort(jobID,message) && message.length() > 0)
+                            Logging.jobs.error(message,e.getCause());
+                          jsr.noteStarted();
+                        }
+                        else
+                        {
+                          // Not sure this can happen -- but just transition silently to inactive state
+                          jobManager.removeJob(jobID);
+                          jsr.noteStarted();
+                        }
+                      }
+                      else
+                      {
+                        // Reset the job to the READYFORDELETENOTIFY state, updating the failtime and failcount fields
+                        jobManager.retryDeleteNotification(jsr,e.getFailTime(),e.getFailRetryCount());
+                        jsr.noteStarted();
+                      }
+                      allOK = false;
+                      break;
+                    }
+                  }
+                }
+                if (allOK)
+                {
+                  jobManager.removeJob(jobID);
+                  jsr.noteStarted();
+                }
+
+              }
+            }
+          }
+          finally
+          {
+            // Clean up all jobs that did not start
+            ManifoldCFException exception = null;
+            int i = 0;
+            while (i < jobsNeedingDeleteNotification.length)
+            {
+              JobNotifyRecord jsr = jobsNeedingDeleteNotification[i++];
+              if (!jsr.wasStarted())
+              {
+                // Clean up from failed start.
+                try
+                {
+                  jobManager.resetDeleteNotifyJob(jsr.getJobID());
+                }
+                catch (ManifoldCFException e)
+                {
+                  exception = e;
+                }
+              }
+            }
+            if (exception != null)
+              throw exception;
+          }
+
           ManifoldCF.sleep(10000L);
         }
         catch (ManifoldCFException e)

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java Fri Jun 20 16:56:19 2014
@@ -84,7 +84,7 @@ public class StartDeleteThread extends T
 
           // See if there are any starting jobs.
           // Note: Since this following call changes the job state, we must be careful to reset it on any kind of failure.
-          JobDeleteRecord[] deleteJobs = jobManager.getJobsReadyForDelete(processID);
+          JobDeleteRecord[] deleteJobs = jobManager.getJobsReadyForDeleteCleanup(processID);
           try
           {