You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/20 18:56:19 UTC
svn commit: r1604213 - in /manifoldcf/trunk: ./
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/
framework/pull-agent/src/main/java/org/apache/manifoldcf...
Author: kwright
Date: Fri Jun 20 16:56:19 2014
New Revision: 1604213
URL: http://svn.apache.org/r1604213
Log:
Fix for CONNECTORS-980.
Modified:
manifoldcf/trunk/ (props changed)
manifoldcf/trunk/CHANGES.txt
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java
Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
Merged /manifoldcf/branches/CONNECTORS-980:r1604127-1604211
Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Fri Jun 20 16:56:19 2014
@@ -3,6 +3,9 @@ $Id$
======================= 1.7-dev =====================
+CONNECTORS-980: Output connector gets notified now when job is deleted.
+(Karl Wright)
+
CONNECTORS-954: Revamp AmazonCloudSearch output connector completely.
(1) Remove Tika and field mapping, since that would be done upstream in the
pipeline.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Fri Jun 20 16:56:19 2014
@@ -548,7 +548,15 @@ public interface IJobManager
*/
public void retryNotification(JobNotifyRecord jobNotifyRecord, long failTime, int failRetryCount)
throws ManifoldCFException;
-
+
+ /** Retry delete notification.
+ *@param jnr is the current job notification record.
+ *@param failTime is the new fail time (-1L if none).
+ *@param failCount is the new fail retry count (-1 if none).
+ */
+ public void retryDeleteNotification(JobNotifyRecord jnr, long failTime, int failCount)
+ throws ManifoldCFException;
+
/** Add an initial set of documents to the queue.
* This method is called during job startup, when the queue is being loaded.
* A set of document references is passed to this method, which updates the status of the document
@@ -825,11 +833,11 @@ public interface IJobManager
public void resetSeedJob(Long jobID)
throws ManifoldCFException;
- /** Get the list of jobs that are ready for deletion.
+ /** Get the list of jobs that are ready for delete cleanup.
*@param processID is the current process ID.
*@return jobs that were in the "readyfordelete" state.
*/
- public JobDeleteRecord[] getJobsReadyForDelete(String processID)
+ public JobDeleteRecord[] getJobsReadyForDeleteCleanup(String processID)
throws ManifoldCFException;
/** Get the list of jobs that are ready for startup.
@@ -846,12 +854,25 @@ public interface IJobManager
public JobNotifyRecord[] getJobsReadyForInactivity(String processID)
throws ManifoldCFException;
+ /** Find the list of jobs that need to have their connectors notified of job deletion.
+ *@param processID is the process ID.
+ *@return the ID's of jobs that need their output connectors notified in order to be removed.
+ */
+ public JobNotifyRecord[] getJobsReadyForDelete(String processID)
+ throws ManifoldCFException;
+
/** Inactivate a job, from the notification state.
*@param jobID is the ID of the job to inactivate.
*/
public void inactivateJob(Long jobID)
throws ManifoldCFException;
+ /** Remove a job, from the notification state.
+ *@param jobID is the ID of the job to remove.
+ */
+ public void removeJob(Long jobID)
+ throws ManifoldCFException;
+
/** Reset a job starting for delete back to "ready for delete"
* state.
*@param jobID is the job id.
@@ -866,6 +887,13 @@ public interface IJobManager
public void resetNotifyJob(Long jobID)
throws ManifoldCFException;
+ /** Reset a job that is delete notifying back to "ready for delete notify"
+ * state.
+ *@param jobID is the job id.
+ */
+ public void resetDeleteNotifyJob(Long jobID)
+ throws ManifoldCFException;
+
/** Reset a starting job back to "ready for startup" state.
*@param jobID is the job id.
*/
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Fri Jun 20 16:56:19 2014
@@ -4299,7 +4299,68 @@ public class JobManager implements IJobM
}
}
-
+
+ /** Retry delete notification.
+ *@param jnr is the current job notification record.
+ *@param failTime is the new fail time (-1L if none).
+ *@param failCount is the new fail retry count (-1 if none).
+ */
+ @Override
+ public void retryDeleteNotification(JobNotifyRecord jnr, long failTime, int failCount)
+ throws ManifoldCFException
+ {
+ Long jobID = jnr.getJobID();
+ long oldFailTime = jnr.getFailTime();
+ if (oldFailTime == -1L)
+ oldFailTime = failTime;
+ failTime = oldFailTime;
+ int oldFailCount = jnr.getFailRetryCount();
+ if (oldFailCount == -1)
+ oldFailCount = failCount;
+ else
+ {
+ oldFailCount--;
+ if (failCount != -1 && oldFailCount > failCount)
+ oldFailCount = failCount;
+ }
+ failCount = oldFailCount;
+
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ jobs.retryDeleteNotification(jobID,failTime,failCount);
+ database.performCommit();
+ break;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction resetting job notification: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+
+ }
+
// Add documents methods
/** Add an initial set of documents to the queue.
@@ -6862,12 +6923,12 @@ public class JobManager implements IJobM
}
}
- /** Get the list of jobs that are ready for deletion.
+ /** Get the list of jobs that are ready for delete cleanup.
*@param processID is the current process ID.
*@return jobs that were in the "readyfordelete" state.
*/
@Override
- public JobDeleteRecord[] getJobsReadyForDelete(String processID)
+ public JobDeleteRecord[] getJobsReadyForDeleteCleanup(String processID)
throws ManifoldCFException
{
while (true)
@@ -7091,6 +7152,84 @@ public class JobManager implements IJobM
}
}
+ /** Remove a job, from the notification state.
+ *@param jobID is the ID of the job to remove.
+ */
+ @Override
+ public void removeJob(Long jobID)
+ throws ManifoldCFException
+ {
+ // While there is no flow that can cause a job to be in the wrong state when this gets called, as a precaution
+ // it might be a good idea to put this in a transaction and have the state get checked first.
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ // Check job status
+ StringBuilder sb = new StringBuilder("SELECT ");
+ ArrayList list = new ArrayList();
+
+ sb.append(jobs.statusField).append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+ .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(jobs.idField,jobID)}))
+ .append(" FOR UPDATE");
+
+ IResultSet set = database.performQuery(sb.toString(),list,null,null);
+ if (set.getRowCount() == 0)
+ // Presume already removed!
+ return;
+ IResultRow row = set.getRow(0);
+ int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+
+ switch (status)
+ {
+ case Jobs.STATUS_NOTIFYINGOFDELETION:
+ ManifoldCF.noteConfigurationChange();
+ // Remove documents from job queue
+ jobQueue.deleteAllJobRecords(jobID);
+ // Remove carrydowns for the job
+ carryDown.deleteOwner(jobID);
+ // Nothing is in a critical section - so this should be OK.
+ hopCount.deleteOwner(jobID);
+ jobs.delete(jobID);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Removed job "+jobID);
+ }
+ break;
+ default:
+ throw new ManifoldCFException("Unexpected job status: "+Integer.toString(status));
+ }
+ database.performCommit();
+ return;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted clearing delete notification state for job: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+
/** Reset a job starting for delete back to "ready for delete"
* state.
*@param jobID is the job id.
@@ -7229,6 +7368,75 @@ public class JobManager implements IJobM
}
}
+ /** Reset a job that is delete notifying back to "ready for delete notify"
+ * state.
+ *@param jobID is the job id.
+ */
+ @Override
+ public void resetDeleteNotifyJob(Long jobID)
+ throws ManifoldCFException
+ {
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ // Check job status
+ StringBuilder sb = new StringBuilder("SELECT ");
+ ArrayList list = new ArrayList();
+
+ sb.append(jobs.statusField).append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+ .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(jobs.idField,jobID)}))
+ .append(" FOR UPDATE");
+
+ IResultSet set = database.performQuery(sb.toString(),list,null,null);
+ if (set.getRowCount() == 0)
+ throw new ManifoldCFException("No such job: "+jobID);
+ IResultRow row = set.getRow(0);
+ int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+
+ switch (status)
+ {
+ case Jobs.STATUS_NOTIFYINGOFDELETION:
+ if (Logging.jobs.isDebugEnabled())
+ Logging.jobs.debug("Setting job "+jobID+" back to 'ReadyForDeleteNotify' state");
+
+ // Set the state of the job back to "ReadyForNotify"
+ jobs.writePermanentStatus(jobID,jobs.STATUS_READYFORDELETENOTIFY,true);
+ break;
+ default:
+ throw new ManifoldCFException("Unexpected job status: "+Integer.toString(status));
+ }
+ database.performCommit();
+ return;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted resetting delete notify job: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+
/** Reset a starting job back to "ready for startup" state.
*@param jobID is the job id.
*/
@@ -7508,18 +7716,12 @@ public class JobManager implements IJobM
if (confirmSet.getRowCount() > 0)
continue;
- ManifoldCF.noteConfigurationChange();
- // Remove documents from job queue
- jobQueue.deleteAllJobRecords(jobID);
- // Remove carrydowns for the job
- carryDown.deleteOwner(jobID);
- // Nothing is in a critical section - so this should be OK.
- hopCount.deleteOwner(jobID);
- jobs.delete(jobID);
+ jobs.finishJobCleanup(jobID);
if (Logging.jobs.isDebugEnabled())
{
- Logging.jobs.debug("Removed job "+jobID);
+ Logging.jobs.debug("Job "+jobID+" cleanup is now completed");
}
+
}
database.performCommit();
return;
@@ -7732,7 +7934,88 @@ public class JobManager implements IJobM
}
}
}
-
+
+ /** Find the list of jobs that need to have their connectors notified of job deletion.
+ *@param processID is the process ID.
+ *@return the ID's of jobs that need their output connectors notified in order to be removed.
+ */
+ @Override
+ public JobNotifyRecord[] getJobsReadyForDelete(String processID)
+ throws ManifoldCFException
+ {
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ // Do the query
+ StringBuilder sb = new StringBuilder("SELECT ");
+ ArrayList list = new ArrayList();
+
+ sb.append(jobs.idField).append(",").append(jobs.failTimeField).append(",").append(jobs.failCountField)
+ .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+ .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(jobs.statusField,jobs.statusToString(jobs.STATUS_READYFORDELETENOTIFY))}))
+ .append(" FOR UPDATE");
+
+ IResultSet set = database.performQuery(sb.toString(),list,null,null);
+ // Return them all
+ JobNotifyRecord[] rval = new JobNotifyRecord[set.getRowCount()];
+ int i = 0;
+ while (i < rval.length)
+ {
+ IResultRow row = set.getRow(i);
+ Long jobID = (Long)row.getValue(jobs.idField);
+ Long failTimeLong = (Long)row.getValue(jobs.failTimeField);
+ Long failRetryCountLong = (Long)row.getValue(jobs.failCountField);
+ long failTime;
+ if (failTimeLong == null)
+ failTime = -1L;
+ else
+ failTime = failTimeLong.longValue();
+ int failRetryCount;
+ if (failRetryCountLong == null)
+ failRetryCount = -1;
+ else
+ failRetryCount = (int)failRetryCountLong.longValue();
+
+ // Mark status of job as "starting delete"
+ jobs.writeTransientStatus(jobID,jobs.STATUS_NOTIFYINGOFDELETION,processID);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Found job "+jobID+" in need of delete notification");
+ }
+ rval[i++] = new JobNotifyRecord(jobID,failTime,failRetryCount);
+ }
+ database.performCommit();
+ return rval;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted getting jobs ready for notify: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+
/** Complete the sequence that resumes jobs, either from a pause or from a scheduling window
* wait. The logic will restore the job to an active state (many possibilities depending on
* connector status), and will record the jobs that have been so modified.
@@ -8256,6 +8539,8 @@ public class JobManager implements IJobM
break;
case Jobs.STATUS_READYFORNOTIFY:
case Jobs.STATUS_NOTIFYINGOFCOMPLETION:
+ case Jobs.STATUS_READYFORDELETENOTIFY:
+ case Jobs.STATUS_NOTIFYINGOFDELETION:
rstatus = JobStatus.JOBSTATUS_JOBENDNOTIFICATION;
break;
case Jobs.STATUS_ABORTING:
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Fri Jun 20 16:56:19 2014
@@ -112,6 +112,8 @@ public class Jobs extends org.apache.man
public static final int STATUS_DELETING = 35; // The job is deleting.
public static final int STATUS_DELETESTARTINGUP = 36; // The delete is starting up.
public static final int STATUS_ABORTINGSHUTTINGDOWN = 37; // Aborting the cleanup phase.
+ public static final int STATUS_READYFORDELETENOTIFY = 38; // Job is ready for delete notification
+ public static final int STATUS_NOTIFYINGOFDELETION = 39; // Notifying connector of job deletion
// These statuses have to do with whether a job has an installed underlying connector or not.
// There are two reasons to have a special state here: (1) if the behavior of the crawler differs, or (2) if the
@@ -122,9 +124,9 @@ public class Jobs extends org.apache.man
// But, since there is no indication in the jobs table of an uninstalled connector for such jobs, the code which starts
// jobs up (or otherwise would enter any state that has a corresponding special state) must check to see if the underlying
// connector exists before deciding what state to put the job into.
- public static final int STATUS_ACTIVE_UNINSTALLED = 38; // Active, but repository connector not installed
- public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 39; // Active and seeding, but repository connector not installed
- public static final int STATUS_DELETING_NOOUTPUT = 40; // Job is being deleted but there's no output connector installed
+ public static final int STATUS_ACTIVE_UNINSTALLED = 40; // Active, but repository connector not installed
+ public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 41; // Active and seeding, but repository connector not installed
+ public static final int STATUS_DELETING_NOOUTPUT = 42; // Job is being deleted but there's no output connector installed
// Deprecated states. These states should never be used; they're defined only for upgrade purposes
public static final int STATUS_ACTIVE_NOOUTPUT = 100; // Active, but output connector not installed
@@ -213,6 +215,8 @@ public class Jobs extends org.apache.man
statusMap.put("S",new Integer(STATUS_SHUTTINGDOWN));
statusMap.put("s",new Integer(STATUS_READYFORNOTIFY));
statusMap.put("n",new Integer(STATUS_NOTIFYINGOFCOMPLETION));
+ statusMap.put("d",new Integer(STATUS_READYFORDELETENOTIFY));
+ statusMap.put("j",new Integer(STATUS_NOTIFYINGOFDELETION));
statusMap.put("W",new Integer(STATUS_ACTIVEWAIT));
statusMap.put("Z",new Integer(STATUS_PAUSEDWAIT));
statusMap.put("X",new Integer(STATUS_ABORTING));
@@ -245,15 +249,18 @@ public class Jobs extends org.apache.man
statusMap.put("I",new Integer(STATUS_RESUMING));
statusMap.put("i",new Integer(STATUS_RESUMINGSEEDING));
+
// These are the uninstalled states. The values, I'm afraid, are pretty random.
statusMap.put("R",new Integer(STATUS_ACTIVE_UNINSTALLED));
statusMap.put("r",new Integer(STATUS_ACTIVESEEDING_UNINSTALLED));
+ statusMap.put("D",new Integer(STATUS_DELETING_NOOUTPUT));
+
+ // These are deprecated states; we may be able to reclaim them
statusMap.put("O",new Integer(STATUS_ACTIVE_NOOUTPUT));
statusMap.put("o",new Integer(STATUS_ACTIVESEEDING_NOOUTPUT));
statusMap.put("U",new Integer(STATUS_ACTIVE_NEITHER));
statusMap.put("u",new Integer(STATUS_ACTIVESEEDING_NEITHER));
- statusMap.put("D",new Integer(STATUS_DELETING_NOOUTPUT));
-
+
typeMap = new HashMap<String,Integer>();
typeMap.put("C",new Integer(TYPE_CONTINUOUS));
typeMap.put("S",new Integer(TYPE_SPECIFIED));
@@ -289,6 +296,7 @@ public class Jobs extends org.apache.man
* STATUS_PAUSEDWAIT
* STATUS_PAUSED
* STATUS_READYFORNOTIFY
+ * STATUS_READYFORDELETENOTIFY
* STATUS_READYFORDELETE
* STATUS_DELETING
* STATUS_READYFORSTARTUP
@@ -310,6 +318,7 @@ public class Jobs extends org.apache.man
* These are the process-transient states:
* STATUS_DELETESTARTINGUP
* STATUS_NOTIFYINGOFCOMPLETION
+ * STATUS_NOTIFYINGOFDELETION
* STATUS_STARTINGUP
* STATUS_STARTINGUPMINIMAL
* STATUS_ABORTINGSTARTINGUPFORRESTART
@@ -1112,6 +1121,17 @@ public class Jobs extends org.apache.man
map.put(failCountField,null);
performUpdate(map,"WHERE "+query,list,invKey);
+ // Notifying of deletion goes back to just being ready for delete notify
+ list.clear();
+ query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFDELETION)),
+ new UnitaryClause(processIDField,processID)});
+ map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+ map.put(processIDField,null);
+ map.put(failTimeField,null);
+ map.put(failCountField,null);
+ performUpdate(map,"WHERE "+query,list,invKey);
+
// Starting up or aborting starting up goes back to just being ready
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -1259,6 +1279,14 @@ public class Jobs extends org.apache.man
map.put(processIDField,null);
performUpdate(map,"WHERE "+query,list,invKey);
+ // Notifying of deletion goes back to just being ready for delete notify
+ list.clear();
+ query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFDELETION))});
+ map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+ map.put(processIDField,null);
+ performUpdate(map,"WHERE "+query,list,invKey);
+
// Starting up or aborting starting up goes back to just being ready
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -1666,6 +1694,17 @@ public class Jobs extends org.apache.man
map.put(failCountField,null);
performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+ list.clear();
+ map.clear();
+ query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(statusField,statusToString(STATUS_NOTIFYINGOFDELETION)),
+ new UnitaryClause(processIDField,processID)});
+ map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+ map.put(processIDField,null);
+ map.put(failTimeField,null);
+ map.put(failCountField,null);
+ performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+
}
/** Reset startup worker thread status.
@@ -1948,7 +1987,32 @@ public class Jobs extends org.apache.man
map.put(processIDField,null);
performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
}
-
+
+ /** Retry delete notification.
+ *@param jobID is the job identifier.
+ *@param failTime is the fail time, -1 == none
+ *@param failCount is the fail count to use, -1 == none.
+ */
+ public void retryDeleteNotification(Long jobID, long failTime, int failCount)
+ throws ManifoldCFException
+ {
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(idField,jobID)});
+ HashMap map = new HashMap();
+ map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+ if (failTime == -1L)
+ map.put(failTimeField,null);
+ else
+ map.put(failTimeField,new Long(failTime));
+ if (failCount == -1)
+ map.put(failCountField,null);
+ else
+ map.put(failCountField,failCount);
+ map.put(processIDField,null);
+ performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+ }
+
/** Write job status and window end, and clear the endtime field. (The start time will be written
* when the job enters the "active" state.)
*@param jobID is the job identifier.
@@ -2758,6 +2822,24 @@ public class Jobs extends org.apache.man
performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
}
+ /** Finish job cleanup.
+ * Write completion and the current time.
+ *@param jobID is the job id.
+ */
+ public void finishJobCleanup(Long jobID)
+ throws ManifoldCFException
+ {
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(idField,jobID)});
+ HashMap map = new HashMap();
+ map.put(statusField,statusToString(STATUS_READYFORDELETENOTIFY));
+ map.put(errorField,null);
+ // Anything else?
+ // MHL
+ performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+ }
+
/** Resume a stopped job (from a pause or activewait).
* Updates the job record in a manner consistent with the job's state.
*/
@@ -3113,6 +3195,10 @@ public class Jobs extends org.apache.man
return "n";
case STATUS_READYFORNOTIFY:
return "s";
+ case STATUS_NOTIFYINGOFDELETION:
+ return "j";
+ case STATUS_READYFORDELETENOTIFY:
+ return "d";
case STATUS_ACTIVEWAIT:
return "W";
case STATUS_PAUSEDWAIT:
@@ -3157,14 +3243,6 @@ public class Jobs extends org.apache.man
return "R";
case STATUS_ACTIVESEEDING_UNINSTALLED:
return "r";
- case STATUS_ACTIVE_NOOUTPUT:
- return "O";
- case STATUS_ACTIVESEEDING_NOOUTPUT:
- return "o";
- case STATUS_ACTIVE_NEITHER:
- return "U";
- case STATUS_ACTIVESEEDING_NEITHER:
- return "u";
case STATUS_DELETING_NOOUTPUT:
return "D";
@@ -3188,6 +3266,16 @@ public class Jobs extends org.apache.man
case STATUS_ABORTINGSHUTTINGDOWN:
return "v";
+ // These are deprecated
+ case STATUS_ACTIVE_NOOUTPUT:
+ return "O";
+ case STATUS_ACTIVESEEDING_NOOUTPUT:
+ return "o";
+ case STATUS_ACTIVE_NEITHER:
+ return "U";
+ case STATUS_ACTIVESEEDING_NEITHER:
+ return "u";
+
default:
throw new ManifoldCFException("Bad status value: "+Integer.toString(status));
}
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java Fri Jun 20 16:56:19 2014
@@ -249,6 +249,184 @@ public class JobNotificationThread exten
throw exception;
}
+ // ???
+ JobNotifyRecord[] jobsNeedingDeleteNotification = jobManager.getJobsReadyForDelete(processID);
+ try
+ {
+ Set<OutputAndRepositoryConnection> connectionNames = new HashSet<OutputAndRepositoryConnection>();
+
+ int k = 0;
+ while (k < jobsNeedingDeleteNotification.length)
+ {
+ JobNotifyRecord jsr = jobsNeedingDeleteNotification[k++];
+ Long jobID = jsr.getJobID();
+ IJobDescription job = jobManager.load(jobID,true);
+ if (job != null)
+ {
+ // Get the connection name
+ String repositoryConnectionName = job.getConnectionName();
+ IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
+ for (int i = 0; i < basicSpec.getOutputCount(); i++)
+ {
+ String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
+ OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
+ connectionNames.add(c);
+ }
+ }
+ }
+
+ // Attempt to notify the specified connections
+ Map<OutputAndRepositoryConnection,Disposition> notifiedConnections = new HashMap<OutputAndRepositoryConnection,Disposition>();
+
+ for (OutputAndRepositoryConnection connections : connectionNames)
+ {
+ String outputConnectionName = connections.getOutputConnectionName();
+ String repositoryConnectionName = connections.getRepositoryConnectionName();
+
+ OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
+
+ IOutputConnection connection = connectionManager.load(outputConnectionName);
+ if (connection != null)
+ {
+ // Grab an appropriate connection instance
+ IOutputConnector connector = outputConnectorPool.grab(connection);
+ if (connector != null)
+ {
+ try
+ {
+ // Do the notification itself
+ try
+ {
+ connector.noteJobComplete(activity);
+ notifiedConnections.put(connections,new Disposition());
+ }
+ catch (ServiceInterruption e)
+ {
+ notifiedConnections.put(connections,new Disposition(e));
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ throw e;
+ if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+ throw e;
+ if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+ throw e;
+ // Nothing special; report the error and keep going.
+ Logging.threads.error(e.getMessage(),e);
+ }
+ }
+ finally
+ {
+ outputConnectorPool.release(connection,connector);
+ }
+ }
+ }
+ }
+
+ // Go through jobs again, and put the notified ones into the inactive state.
+ k = 0;
+ while (k < jobsNeedingDeleteNotification.length)
+ {
+ JobNotifyRecord jsr = jobsNeedingDeleteNotification[k++];
+ Long jobID = jsr.getJobID();
+ IJobDescription job = jobManager.load(jobID,true);
+ if (job != null)
+ {
+ // Get the connection name
+ String repositoryConnectionName = job.getConnectionName();
+ IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
+ boolean allOK = true;
+ for (int i = 0; i < basicSpec.getOutputCount(); i++)
+ {
+ String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
+
+ OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
+
+ Disposition d = notifiedConnections.get(c);
+ if (d != null)
+ {
+ ServiceInterruption e = d.getServiceInterruption();
+ if (e == null)
+ {
+ break;
+ }
+ else
+ {
+ if (!e.jobInactiveAbort())
+ {
+ Logging.jobs.warn("Delete notification service interruption reported for job "+
+ jobID+" output connection '"+outputConnectionName+"': "+
+ e.getMessage(),e);
+ }
+
+ // If either we are going to be requeuing beyond the fail time, OR
+ // the number of retries available has hit 0, THEN we treat this
+ // as either an "ignore" or a hard error.
+ if (!e.jobInactiveAbort() && (jsr.getFailTime() != -1L && jsr.getFailTime() < e.getRetryTime() ||
+ jsr.getFailRetryCount() == 0))
+ {
+ // Treat this as a hard failure.
+ if (e.isAbortOnFail())
+ {
+ // Note the error in the job, and transition to inactive state
+ String message = e.jobInactiveAbort()?"":"Repeated service interruptions during notification"+((e.getCause()!=null)?": "+e.getCause().getMessage():"");
+ if (jobManager.errorAbort(jobID,message) && message.length() > 0)
+ Logging.jobs.error(message,e.getCause());
+ jsr.noteStarted();
+ }
+ else
+ {
+ // Not sure this can happen -- but just transition silently to inactive state
+ jobManager.removeJob(jobID);
+ jsr.noteStarted();
+ }
+ }
+ else
+ {
+ // Reset the job to the READYFORDELETENOTIFY state, updating the failtime and failcount fields
+ jobManager.retryDeleteNotification(jsr,e.getFailTime(),e.getFailRetryCount());
+ jsr.noteStarted();
+ }
+ allOK = false;
+ break;
+ }
+ }
+ }
+ if (allOK)
+ {
+ jobManager.removeJob(jobID);
+ jsr.noteStarted();
+ }
+
+ }
+ }
+ }
+ finally
+ {
+ // Clean up all jobs that did not start
+ ManifoldCFException exception = null;
+ int i = 0;
+ while (i < jobsNeedingDeleteNotification.length)
+ {
+ JobNotifyRecord jsr = jobsNeedingDeleteNotification[i++];
+ if (!jsr.wasStarted())
+ {
+ // Clean up from failed start.
+ try
+ {
+ jobManager.resetDeleteNotifyJob(jsr.getJobID());
+ }
+ catch (ManifoldCFException e)
+ {
+ exception = e;
+ }
+ }
+ }
+ if (exception != null)
+ throw exception;
+ }
+
ManifoldCF.sleep(10000L);
}
catch (ManifoldCFException e)
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java?rev=1604213&r1=1604212&r2=1604213&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartDeleteThread.java Fri Jun 20 16:56:19 2014
@@ -84,7 +84,7 @@ public class StartDeleteThread extends T
// See if there are any starting jobs.
// Note: Since this following call changes the job state, we must be careful to reset it on any kind of failure.
- JobDeleteRecord[] deleteJobs = jobManager.getJobsReadyForDelete(processID);
+ JobDeleteRecord[] deleteJobs = jobManager.getJobsReadyForDeleteCleanup(processID);
try
{