You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/14 03:21:54 UTC
svn commit: r1058837 - in /incubator/lcf/trunk: ./
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/
framework/pull-agent/src/main/java/org/apache/manifol...
Author: kwright
Date: Fri Jan 14 02:21:53 2011
New Revision: 1058837
URL: http://svn.apache.org/viewvc?rev=1058837&view=rev
Log:
Fix for CONNECTORS-145.
Modified:
incubator/lcf/trunk/CHANGES.txt
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
Modified: incubator/lcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/CHANGES.txt?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/CHANGES.txt (original)
+++ incubator/lcf/trunk/CHANGES.txt Fri Jan 14 02:21:53 2011
@@ -3,6 +3,10 @@ $Id$
================== 0.2-dev ==================
+CONNECTORS-145: Refactor cleanup worker threads, expire threads, and
+document delete threads to handle failure of the output connection gracefully.
+(Karl Wright)
+
CONNECTORS-130: Block the Solr output connector from accepting documents
that have folder-level security.
(Karl Wright)
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Fri Jan 14 02:21:53 2011
@@ -360,20 +360,25 @@ public interface IJobManager
* current status and decide what the new status ought to be, based on a true rollback scenario. Such cases, however, are rare enough so that
* special logic is probably not worth it.
*@param documentDescriptions is the set of description objects for the document that was processed.
+ *@param checkTime is the minimum time for the next cleaning attempt.
*/
- public void resetDeletingDocumentMultiple(DocumentDescription[] documentDescriptions)
+ public void resetDeletingDocumentMultiple(DocumentDescription[] documentDescriptions, long checkTime)
throws ManifoldCFException;
/** Reset a deleting document back to its former state.
* This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+ *@param documentDescription is the description object for the document that was cleaned.
+ *@param checkTime is the minimum time for the next cleaning attempt.
*/
- public void resetDeletingDocument(DocumentDescription documentDescription)
+ public void resetDeletingDocument(DocumentDescription documentDescription, long checkTime)
throws ManifoldCFException;
/** Reset a cleaning document back to its former state.
- * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+ * This gets done when a cleaning thread sees a service interruption, etc., from the ingestion system.
+ *@param documentDescription is the description object for the document that was cleaned.
+ *@param checkTime is the minimum time for the next cleaning attempt.
*/
- public void resetCleaningDocument(DocumentDescription documentDescription)
+ public void resetCleaningDocument(DocumentDescription documentDescription, long checkTime)
throws ManifoldCFException;
/** Reset a set of cleaning documents for further processing in the future.
@@ -383,8 +388,9 @@ public interface IJobManager
* current status and decide what the new status ought to be, based on a true rollback scenario. Such cases, however, are rare enough so that
* special logic is probably not worth it.
*@param documentDescriptions is the set of description objects for the document that was cleaned.
+ *@param checkTime is the minimum time for the next cleaning attempt.
*/
- public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+ public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions, long checkTime)
throws ManifoldCFException;
/** Add an initial set of documents to the queue.
@@ -744,17 +750,19 @@ public interface IJobManager
/** Get list of deletable document descriptions. This list will take into account
* multiple jobs that may own the same document.
*@param n is the maximum number of documents to return.
+ *@param currentTime is the current time; some fetches do not occur until a specific time.
*@return the document descriptions for these documents.
*/
- public DocumentDescription[] getNextDeletableDocuments(int n)
+ public DocumentDescription[] getNextDeletableDocuments(int n, long currentTime)
throws ManifoldCFException;
/** Get list of cleanable document descriptions. This list will take into account
* multiple jobs that may own the same document.
*@param n is the maximum number of documents to return.
+ *@param currentTime is the current time; some fetches do not occur until a specific time.
*@return the document descriptions for these documents.
*/
- public DocumentSetAndFlags getNextCleanableDocuments(int n)
+ public DocumentSetAndFlags getNextCleanableDocuments(int n, long currentTime)
throws ManifoldCFException;
/** Delete ingested document identifiers (as part of deleting the owning job).
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Fri Jan 14 02:21:53 2011
@@ -781,9 +781,10 @@ public class JobManager implements IJobM
* not in transition and are eligible, but are owned by other jobs, will have their
* jobqueue entries deleted by this method.
*@param maxCount is the maximum number of documents to return.
+ *@param currentTime is the current time; some fetches do not occur until a specific time.
*@return the document descriptions for these documents.
*/
- public DocumentSetAndFlags getNextCleanableDocuments(int maxCount)
+ public DocumentSetAndFlags getNextCleanableDocuments(int maxCount, long currentTime)
throws ManifoldCFException
{
// The query will be built here, because it joins the jobs table against the jobqueue
@@ -830,6 +831,8 @@ public class JobManager implements IJobM
ArrayList list = new ArrayList();
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
+ list.add(new Long(currentTime));
+
list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
@@ -839,9 +842,11 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+ // The checktime is null field check is for backwards compatibility
IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+"=? "+
+ " AND (t0."+jobQueue.checkTimeField+" IS NULL OR t0."+jobQueue.checkTimeField+"<=?) "+
" AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
" AND t1."+jobs.statusField+"=?"+
") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
@@ -1006,9 +1011,10 @@ public class JobManager implements IJobM
* not in transition and are eligible, but are owned by other jobs, will have their
* jobqueue entries deleted by this method.
*@param maxCount is the maximum number of documents to return.
+ *@param currentTime is the current time; some fetches do not occur until a specific time.
*@return the document descriptions for these documents.
*/
- public DocumentDescription[] getNextDeletableDocuments(int maxCount)
+ public DocumentDescription[] getNextDeletableDocuments(int maxCount, long currentTime)
throws ManifoldCFException
{
// The query will be built here, because it joins the jobs table against the jobqueue
@@ -1059,6 +1065,8 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
+ list.add(new Long(currentTime));
+
list.add(jobs.statusToString(jobs.STATUS_READYFORDELETE));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
@@ -1068,9 +1076,11 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+ // The checktime is null field check is for backwards compatibility
IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+" IN (?,?,?) "+
+ " AND (t0."+jobQueue.checkTimeField+" IS NULL OR t0."+jobQueue.checkTimeField+"<=?) "+
" AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
" AND t1."+jobs.statusField+"=?"+
") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
@@ -2807,8 +2817,9 @@ public class JobManager implements IJobM
* current status and decide what the new status ought to be, based on a true rollback scenario. Such cases, however, are rare enough so that
* special logic is probably not worth it.
*@param documentDescriptions is the set of description objects for the document that was cleaned.
+ *@param checkTime is the minimum time for the next cleaning attempt.
*/
- public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+ public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions, long checkTime)
throws ManifoldCFException
{
Long[] ids = new Long[documentDescriptions.length];
@@ -2851,7 +2862,7 @@ public class JobManager implements IJobM
i = 0;
while (i < ids.length)
{
- jobQueue.setUncleaningStatus(ids[i]);
+ jobQueue.setUncleaningStatus(ids[i],checkTime);
i++;
}
@@ -2884,11 +2895,13 @@ public class JobManager implements IJobM
/** Reset a cleaning document back to its former state.
* This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+ *@param documentDescription is the description of the document that was cleaned.
+ *@param checkTime is the minimum time for the next cleaning attempt.
*/
- public void resetCleaningDocument(DocumentDescription documentDescription)
+ public void resetCleaningDocument(DocumentDescription documentDescription, long checkTime)
throws ManifoldCFException
{
- resetCleaningDocumentMultiple(new DocumentDescription[]{documentDescription});
+ resetCleaningDocumentMultiple(new DocumentDescription[]{documentDescription},checkTime);
}
/** Reset a set of deleting documents for further processing in the future.
@@ -2898,8 +2911,9 @@ public class JobManager implements IJobM
* current status and decide what the new status ought to be, based on a true rollback scenario. Such cases, however, are rare enough so that
* special logic is probably not worth it.
*@param documentDescriptions is the set of description objects for the document that was processed.
+ *@param checkTime is the minimum time for the next cleaning attempt.
*/
- public void resetDeletingDocumentMultiple(DocumentDescription[] documentDescriptions)
+ public void resetDeletingDocumentMultiple(DocumentDescription[] documentDescriptions, long checkTime)
throws ManifoldCFException
{
Long[] ids = new Long[documentDescriptions.length];
@@ -2942,7 +2956,7 @@ public class JobManager implements IJobM
i = 0;
while (i < ids.length)
{
- jobQueue.setUndeletingStatus(ids[i]);
+ jobQueue.setUndeletingStatus(ids[i],checkTime);
i++;
}
@@ -2975,11 +2989,13 @@ public class JobManager implements IJobM
/** Reset a deleting document back to its former state.
* This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+ *@param documentDescription is the description object for the document that was cleaned.
+ *@param checkTime is the minimum time for the next cleaning attempt.
*/
- public void resetDeletingDocument(DocumentDescription documentDescription)
+ public void resetDeletingDocument(DocumentDescription documentDescription, long checkTime)
throws ManifoldCFException
{
- resetDeletingDocumentMultiple(new DocumentDescription[]{documentDescription});
+ resetDeletingDocumentMultiple(new DocumentDescription[]{documentDescription},checkTime);
}
@@ -6266,6 +6282,7 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
list.add(jobQueue.actionToString(jobQueue.ACTION_RESCAN));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
@@ -6337,7 +6354,9 @@ public class JobManager implements IJobM
.append(" OR ").append("t0.").append(jobQueue.statusField).append("=?")
.append(")")
.append(" THEN 'Waiting forever'")
- .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=?")
+ .append(" WHEN (").append("t0.").append(jobQueue.statusField).append("=?")
+ .append(" OR ").append("t0.").append(jobQueue.statusField).append("=?)")
+ .append(")")
.append(" THEN 'Deleting'")
.append(" WHEN ")
.append("(t0.").append(jobQueue.checkActionField).append(" IS NULL OR t0.").append(jobQueue.checkActionField).append("=?)")
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Fri Jan 14 02:21:53 2011
@@ -308,12 +308,14 @@ public class JobQueue extends org.apache
// Map BEINGDELETED to COMPLETE
map.put(statusField,statusToString(STATUS_COMPLETE));
+ map.put(checkTimeField,new Long(0L));
list.clear();
list.add(statusToString(STATUS_BEINGDELETED));
performUpdate(map,"WHERE "+statusField+"=?",list,null);
// Map BEINGCLEANED to PURGATORY
map.put(statusField,statusToString(STATUS_PURGATORY));
+ map.put(checkTimeField,new Long(0L));
list.clear();
list.add(statusToString(STATUS_BEINGCLEANED));
performUpdate(map,"WHERE "+statusField+"=?",list,null);
@@ -382,6 +384,7 @@ public class JobQueue extends org.apache
ArrayList list = new ArrayList();
// Map BEINGDELETED to COMPLETE
map.put(statusField,statusToString(STATUS_COMPLETE));
+ map.put(checkTimeField,new Long(0L));
list.clear();
list.add(statusToString(STATUS_BEINGDELETED));
performUpdate(map,"WHERE "+statusField+"=?",list,null);
@@ -396,6 +399,7 @@ public class JobQueue extends org.apache
ArrayList list = new ArrayList();
// Map BEINGCLEANED to PURGATORY
map.put(statusField,statusToString(STATUS_PURGATORY));
+ map.put(checkTimeField,new Long(0L));
list.clear();
list.add(statusToString(STATUS_BEINGCLEANED));
performUpdate(map,"WHERE "+statusField+"=?",list,null);
@@ -423,7 +427,7 @@ public class JobQueue extends org.apache
// Turn PENDINGPURGATORY, COMPLETED into PURGATORY.
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PURGATORY));
- map.put(checkTimeField,null);
+ map.put(checkTimeField,new Long(0L));
map.put(checkActionField,null);
map.put(failTimeField,null);
map.put(failCountField,null);
@@ -555,7 +559,7 @@ public class JobQueue extends org.apache
case STATUS_ACTIVEPURGATORY:
newStatus = STATUS_COMPLETE;
actionFieldValue = null;
- checkTimeValue = null;
+ checkTimeValue = new Long(0L);
break;
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
@@ -653,12 +657,12 @@ public class JobQueue extends org.apache
}
/** Set the status of a document to be "no longer deleting" */
- public void setUndeletingStatus(Long id)
+ public void setUndeletingStatus(Long id, long checkTime)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_COMPLETE));
- map.put(checkTimeField,null);
+ map.put(checkTimeField,new Long(checkTime));
map.put(checkActionField,null);
map.put(failTimeField,null);
map.put(failCountField,null);
@@ -681,12 +685,12 @@ public class JobQueue extends org.apache
}
/** Set the status of a document to be "no longer cleaning" */
- public void setUncleaningStatus(Long id)
+ public void setUncleaningStatus(Long id, long checkTime)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PURGATORY));
- map.put(checkTimeField,null);
+ map.put(checkTimeField,new Long(checkTime));
map.put(checkActionField,null);
map.put(failTimeField,null);
map.put(failCountField,null);
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java Fri Jan 14 02:21:53 2011
@@ -32,13 +32,25 @@ public class DocumentCleanupSet
/** This is the array of documents to delete. */
protected CleanupQueuedDocument[] documents;
+ /** The job description for this set of documents. */
+ protected IJobDescription jobDescription;
/** Constructor.
*@param documents is the arraylist representing the documents for this chunk.
+ *@param jobDescription is the job description for all the documents.
*/
- public DocumentCleanupSet(CleanupQueuedDocument[] documents)
+ public DocumentCleanupSet(CleanupQueuedDocument[] documents, IJobDescription jobDescription)
{
this.documents = documents;
+ this.jobDescription = jobDescription;
+ }
+
+ /** Get the job description.
+ *@return the job description.
+ */
+ public IJobDescription getJobDescription()
+ {
+ return this.jobDescription;
}
/** Get the number of documents.
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java Fri Jan 14 02:21:53 2011
@@ -96,11 +96,13 @@ public class DocumentCleanupStufferThrea
Logging.threads.debug("Document cleanup stuffer thread woke up");
+ long currentTime = System.currentTimeMillis();
+
+ // Get a single chunk at a time (but keep going until everything is stuffed)
// This method will set the status of the documents in question
// to "beingcleaned".
- // Get a single chunk at a time (but keep going until everything is stuffed)
- DocumentSetAndFlags documentsToClean = jobManager.getNextCleanableDocuments(deleteChunkSize);
+ DocumentSetAndFlags documentsToClean = jobManager.getNextCleanableDocuments(deleteChunkSize,currentTime);
DocumentDescription[] descs = documentsToClean.getDocumentSet();
boolean[] removeFromIndex = documentsToClean.getFlags();
@@ -116,16 +118,39 @@ public class DocumentCleanupStufferThrea
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Document cleanup stuffer thread found "+Integer.toString(descs.length)+" documents");
- // Do the stuffing
- CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[descs.length];
+ // Do the stuffing. Each set must be segregated by job, since we need the job ID in the doc set.
+ Map jobMap = new HashMap();
int k = 0;
- while (k < docDescs.length)
+ while (k < descs.length)
{
- docDescs[k] = new CleanupQueuedDocument(descs[k],removeFromIndex[k]);
+ CleanupQueuedDocument x = new CleanupQueuedDocument(descs[k],removeFromIndex[k]);
+ Long jobID = descs[k].getJobID();
+ List y = (List)jobMap.get(jobID);
+ if (y == null)
+ {
+ y = new ArrayList();
+ jobMap.put(jobID,y);
+ }
+ y.add(x);
k++;
}
- DocumentCleanupSet set = new DocumentCleanupSet(docDescs);
- documentCleanupQueue.addDocuments(set);
+
+ Iterator iter = jobMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ Long jobID = (Long)iter.next();
+ IJobDescription jobDescription = jobManager.load(jobID,true);
+ List y = (List)jobMap.get(jobID);
+ CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[y.size()];
+ k = 0;
+ while (k < docDescs.length)
+ {
+ docDescs[k] = (CleanupQueuedDocument)y.get(k);
+ k++;
+ }
+ DocumentCleanupSet set = new DocumentCleanupSet(docDescs,jobDescription);
+ documentCleanupQueue.addDocuments(set);
+ }
// If we don't wait here, the other threads don't have a chance to queue anything else up.
yield();
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Fri Jan 14 02:21:53 2011
@@ -104,198 +104,164 @@ public class DocumentCleanupThread exten
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+ IJobDescription job = dds.getJobDescription();
+ String connectionName = job.getConnectionName();
+ String outputConnectionName = job.getOutputConnectionName();
+
try
{
long currentTime = System.currentTimeMillis();
- // We need to segregate all the documents by connection, in order to be able to form a decent activities object
- // to pass into the incremental ingester. So, first pass through the document descriptions will build that.
- Map mappedDocs = new HashMap();
+ // Produce a map of connection name->connection object. We will use this to perform a request for multiple connector objects
+ IRepositoryConnection connection = connMgr.load(connectionName);
+
+ // This is where we store the hopcount cleanup data
+ ArrayList arrayDocHashes = new ArrayList();
+ ArrayList arrayDocsToDelete = new ArrayList();
+ ArrayList arrayRelationshipTypes = new ArrayList();
+ ArrayList hopcountMethods = new ArrayList();
+
int j = 0;
while (j < dds.getCount())
{
- CleanupQueuedDocument dqd = dds.getDocument(j++);
+ CleanupQueuedDocument dqd = dds.getDocument(j);
DocumentDescription ddd = dqd.getDocumentDescription();
- Long jobID = ddd.getJobID();
- IJobDescription job = jobManager.load(jobID,true);
- String connectionName = job.getConnectionName();
- ArrayList list = (ArrayList)mappedDocs.get(connectionName);
- if (list == null)
+ if (job != null && connection != null)
{
- list = new ArrayList();
- mappedDocs.put(connectionName,list);
+ // We'll need the legal link types; grab those before we proceed
+ String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
+ if (legalLinkTypes != null)
+ {
+ arrayDocHashes.add(ddd.getDocumentIdentifierHash());
+ arrayDocsToDelete.add(dqd);
+ arrayRelationshipTypes.add(legalLinkTypes);
+ hopcountMethods.add(new Integer(job.getHopcountMode()));
+ }
}
- list.add(dqd);
+ j++;
}
- // Now, cycle through all represented connections.
- // For each connection, construct the necessary pieces to do the deletion.
- Iterator iter = mappedDocs.keySet().iterator();
- while (iter.hasNext())
+ // Grab one connection for each connectionName. If we fail, nothing is lost and retries are possible.
+ try
{
- String connectionName = (String)iter.next();
- ArrayList list = (ArrayList)mappedDocs.get(connectionName);
-
- // Produce a map of connection name->connection object. We will use this to perform a request for multiple connector objects
- IRepositoryConnection connection = connMgr.load(connectionName);
- ArrayList arrayOutputConnectionNames = new ArrayList();
- ArrayList arrayDocHashes = new ArrayList();
- ArrayList arrayDocClasses = new ArrayList();
- ArrayList arrayDocsToDelete = new ArrayList();
- ArrayList arrayRelationshipTypes = new ArrayList();
- ArrayList hopcountMethods = new ArrayList();
- ArrayList connections = new ArrayList();
- j = 0;
- while (j < list.size())
+ IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+ try
{
- CleanupQueuedDocument dqd = (CleanupQueuedDocument)list.get(j);
- DocumentDescription ddd = dqd.getDocumentDescription();
- Long jobID = ddd.getJobID();
- IJobDescription job = jobManager.load(jobID,true);
- if (job != null && connection != null)
+ // Iterate over the outputs
+ boolean[] deleteFromQueue = new boolean[arrayDocHashes.size()];
+
+ // Count the number of docs to actually delete. This will be a subset of the documents in the list.
+ int k = 0;
+ int removeCount = 0;
+ while (k < arrayDocHashes.size())
{
- // We'll need the legal link types; grab those before we proceed
- String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
- if (legalLinkTypes != null)
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
{
- arrayOutputConnectionNames.add(job.getOutputConnectionName());
- arrayDocClasses.add(connectionName);
- arrayDocHashes.add(ddd.getDocumentIdentifierHash());
- arrayDocsToDelete.add(dqd);
- arrayRelationshipTypes.add(legalLinkTypes);
- hopcountMethods.add(new Integer(job.getHopcountMode()));
+ deleteFromQueue[k] = false;
+ removeCount++;
}
+ else
+ deleteFromQueue[k] = true;
+ k++;
}
- j++;
- }
-
- // Next, segregate the documents by output connection name. This will permit logging to know what actual activity type to use.
- HashMap outputMap = new HashMap();
- j = 0;
- while (j < arrayDocHashes.size())
- {
- String outputConnectionName = (String)arrayOutputConnectionNames.get(j);
- ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
- if (subList == null)
+
+ // Allocate removal arrays
+ String[] docClassesToRemove = new String[removeCount];
+ String[] hashedDocsToRemove = new String[removeCount];
+
+ // Now, iterate over the list
+ k = 0;
+ removeCount = 0;
+ while (k < arrayDocHashes.size())
{
- subList = new ArrayList();
- outputMap.put(outputConnectionName,subList);
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ {
+ docClassesToRemove[removeCount] = connectionName;
+ hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(k);
+ removeCount++;
+ }
+ k++;
}
- subList.add(new Integer(j));
- j++;
- }
+
+ OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+
+ // Finally, go ahead and delete the documents from the ingestion system.
- // Grab one connection for each connectionName. If we fail, nothing is lost and retries are possible.
- try
- {
- IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
try
{
-
- // Iterate over the outputs
- Iterator outputIterator = outputMap.keySet().iterator();
- while (outputIterator.hasNext())
+ ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+ // Success! Label all these as needing deletion from queue.
+ k = 0;
+ while (k < arrayDocHashes.size())
{
- String outputConnectionName = (String)outputIterator.next();
- ArrayList indexList = (ArrayList)outputMap.get(outputConnectionName);
- // Count the number of docs to actually delete. This will be a subset of the documents in the list.
- int k = 0;
- int removeCount = 0;
- while (k < indexList.size())
- {
- int index = ((Integer)indexList.get(k++)).intValue();
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
- removeCount++;
- }
-
- // Allocate removal arrays
- String[] docClassesToRemove = new String[removeCount];
- String[] hashedDocsToRemove = new String[removeCount];
-
- // Now, iterate over the index list
- k = 0;
- removeCount = 0;
- while (k < indexList.size())
- {
- int index = ((Integer)indexList.get(k)).intValue();
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
- {
- docClassesToRemove[removeCount] = (String)arrayDocClasses.get(index);
- hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(index);
- removeCount++;
- }
- k++;
- }
-
- OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
-
- // Finally, go ahead and delete the documents from the ingestion system.
-
- while (true)
- {
- try
- {
- ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
- break;
- }
- catch (ServiceInterruption e)
- {
- // If we get a service interruption here, it means that the ingestion API is down.
- // There is no point, therefore, in freeing up this thread to go do something else;
- // might as well just wait here for our retries.
- // Wait for the prescribed time
- long amt = e.getRetryTime();
- long now = System.currentTimeMillis();
- long waittime = amt-now;
- if (waittime <= 0L)
- waittime = 300000L;
- ManifoldCF.sleep(waittime);
- }
- }
-
- // Successfully deleted some documents from ingestion system. Now, remove them from job queue. This
- // must currently happen one document at a time, because the jobs and connectors for each document
- // potentially differ.
- k = 0;
- while (k < indexList.size())
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ deleteFromQueue[k] = true;
+ k++;
+ }
+ }
+ catch (ServiceInterruption e)
+ {
+ // We don't know which failed, or maybe they all did.
+ // Go through the list of documents we just tried, and reset them on the queue based on the
+ // ServiceInterruption parameters. Then we must proceed to delete ONLY the documents that
+ // were not part of the index deletion attempt.
+ k = 0;
+ while (k < arrayDocHashes.size())
+ {
+ CleanupQueuedDocument cqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+ if (cqd.shouldBeRemovedFromIndex())
{
- int index = ((Integer)indexList.get(k)).intValue();
-
- DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(index);
- DocumentDescription ddd = dqd.getDocumentDescription();
- Long jobID = ddd.getJobID();
- int hopcountMethod = ((Integer)hopcountMethods.get(index)).intValue();
- String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(index);
- DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
- // Use the common method for doing the requeuing
- ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
- connector,connection,queueTracker,currentTime);
- // Finally, completed expiration of the document.
- dqd.setProcessed();
- k++;
+ DocumentDescription dd = cqd.getDocumentDescription();
+ // To recover from a cleanup failure, requeue the document to PURGATORY.
+ jobManager.resetCleaningDocument(dd,e.getRetryTime());
+ cqd.setProcessed();
}
+ k++;
}
}
- finally
+
+ // Successfully deleted some documents from ingestion system. Now, remove them from job queue. This
+ // must currently happen one document at a time, because the jobs and connectors for each document
+ // potentially differ.
+ k = 0;
+ while (k < arrayDocHashes.size())
{
- // Free up the reserved connector instance
- RepositoryConnectorFactory.release(connector);
+ if (deleteFromQueue[k])
+ {
+ DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(k);
+ DocumentDescription ddd = dqd.getDocumentDescription();
+ Long jobID = ddd.getJobID();
+ int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
+ String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
+ DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+ // Use the common method for doing the requeuing
+ ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
+ connector,connection,queueTracker,currentTime);
+ // Finally, completed expiration of the document.
+ dqd.setProcessed();
+ }
+ k++;
}
}
- catch (ManifoldCFException e)
+ finally
{
- if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
- {
- // This error can only come from grabbing the connections. So, if this occurs it means that
- // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
- Logging.threads.warn("Document cleanup thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
+ // Free up the reserved connector instance
+ RepositoryConnectorFactory.release(connector);
+ }
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
+ {
+ // This error can only come from grabbing the connections. So, if this occurs it means that
+ // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
+ Logging.threads.warn("Document cleanup thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
- // Let the unprocessed documents get requeued! This is handled at the end of the loop...
- }
- else
- throw e;
+ // Let the unprocessed documents get requeued! This is handled at the end of the loop...
}
+ else
+ throw e;
}
}
catch (ManifoldCFException e)
@@ -306,7 +272,15 @@ public class DocumentCleanupThread exten
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
throw e;
- Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+ // It's ok to abort a job because we can't talk to the search engine during cleanup.
+ if (jobManager.errorAbort(dds.getJobDescription().getID(),e.getMessage()))
+ {
+ // We eat the exception if there was already one recorded.
+
+ // An exception occurred in the processing of a set of documents.
+ // Shut the corresponding job down, with an appropriate error
+ Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+ }
}
finally
{
@@ -319,7 +293,7 @@ public class DocumentCleanupThread exten
{
DocumentDescription ddd = dqd.getDocumentDescription();
// Requeue this document!
- jobManager.resetCleaningDocument(ddd);
+ jobManager.resetCleaningDocument(ddd,0L);
dqd.setProcessed();
}
j++;
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java Fri Jan 14 02:21:53 2011
@@ -32,15 +32,27 @@ public class DocumentDeleteSet
/** This is the array of documents to delete. */
protected DeleteQueuedDocument[] documents;
-
+ /** The job description for this set of documents. */
+ protected IJobDescription jobDescription;
+
/** Constructor.
*@param documents is the arraylist representing the documents for this chunk.
+ *@param jobDescription is the job description for all the documents.
*/
- public DocumentDeleteSet(DeleteQueuedDocument[] documents)
+ public DocumentDeleteSet(DeleteQueuedDocument[] documents, IJobDescription jobDescription)
{
this.documents = documents;
+ this.jobDescription = jobDescription;
}
+ /** Get the job description.
+ *@return the job description.
+ */
+ public IJobDescription getJobDescription()
+ {
+ return this.jobDescription;
+ }
+
/** Get the number of documents.
*@return the number.
*/
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java Fri Jan 14 02:21:53 2011
@@ -96,11 +96,13 @@ public class DocumentDeleteStufferThread
Logging.threads.debug("Document delete stuffer thread woke up");
+ long currentTime = System.currentTimeMillis();
+
+ // Get a single chunk at a time (but keep going until everything is stuffed)
// This method will set the status of the documents in question
// to "beingdeleted".
- // Get a single chunk at a time (but keep going until everything is stuffed)
- DocumentDescription[] descs = jobManager.getNextDeletableDocuments(deleteChunkSize);
+ DocumentDescription[] descs = jobManager.getNextDeletableDocuments(deleteChunkSize,currentTime);
// If there are no chunks at all, then we can sleep for a while.
// The theory is that we need to allow stuff to accumulate.
@@ -114,16 +116,39 @@ public class DocumentDeleteStufferThread
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Document delete stuffer thread found "+Integer.toString(descs.length)+" documents");
- // Do the stuffing
- DeleteQueuedDocument[] docDescs = new DeleteQueuedDocument[descs.length];
+ // Do the stuffing. Each set must be segregated by job, since we need the job ID in the doc set.
+ Map jobMap = new HashMap();
int k = 0;
- while (k < docDescs.length)
+ while (k < descs.length)
{
- docDescs[k] = new DeleteQueuedDocument(descs[k]);
+ DeleteQueuedDocument x = new DeleteQueuedDocument(descs[k]);
+ Long jobID = descs[k].getJobID();
+ List y = (List)jobMap.get(jobID);
+ if (y == null)
+ {
+ y = new ArrayList();
+ jobMap.put(jobID,y);
+ }
+ y.add(x);
k++;
}
- DocumentDeleteSet set = new DocumentDeleteSet(docDescs);
- documentDeleteQueue.addDocuments(set);
+
+ Iterator iter = jobMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ Long jobID = (Long)iter.next();
+ IJobDescription jobDescription = jobManager.load(jobID,true);
+ List y = (List)jobMap.get(jobID);
+ DeleteQueuedDocument[] docDescs = new DeleteQueuedDocument[y.size()];
+ k = 0;
+ while (k < docDescs.length)
+ {
+ docDescs[k] = (DeleteQueuedDocument)y.get(k);
+ k++;
+ }
+ DocumentDeleteSet set = new DocumentDeleteSet(docDescs,jobDescription);
+ documentDeleteQueue.addDocuments(set);
+ }
// If we don't wait here, the other threads don't have a chance to queue anything else up.
yield();
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java Fri Jan 14 02:21:53 2011
@@ -91,6 +91,10 @@ public class DocumentDeleteThread extend
// Reset
continue;
+ IJobDescription job = dds.getJobDescription();
+ String connectionName = job.getConnectionName();
+ String outputConnectionName = job.getOutputConnectionName();
+
try
{
// Do the delete work.
@@ -99,116 +103,82 @@ public class DocumentDeleteThread extend
// with the individual connection, so the first job is to segregate what came in into connection bins. Then, we process each connection
// bin appropriately.
- // This is a map keyed by connection name, and containing elements that are an ArrayList of DeleteQueuedDocument objects.
- Map mappedDocs = new HashMap();
+ boolean[] deleteFromQueue = new boolean[dds.getCount()];
+
+ String[] docClassesToRemove = new String[dds.getCount()];
+ String[] hashedDocsToRemove = new String[dds.getCount()];
+ DeleteQueuedDocument[] docsToDelete = new DeleteQueuedDocument[dds.getCount()];
int j = 0;
while (j < dds.getCount())
{
- DeleteQueuedDocument dqd = dds.getDocument(j++);
+ DeleteQueuedDocument dqd = dds.getDocument(j);
DocumentDescription ddd = dqd.getDocumentDescription();
- Long jobID = ddd.getJobID();
- IJobDescription job = jobManager.load(jobID,true);
- String connectionName = job.getConnectionName();
- ArrayList list = (ArrayList)mappedDocs.get(connectionName);
- if (list == null)
- {
- list = new ArrayList();
- mappedDocs.put(connectionName,list);
- }
- list.add(dqd);
+ docClassesToRemove[j] = connectionName;
+ hashedDocsToRemove[j] = ddd.getDocumentIdentifierHash();
+ docsToDelete[j] = dqd;
+ deleteFromQueue[j] = false;
+ j++;
}
-
- // For each connection, construct the necessary pieces to do the deletion.
- Iterator iter = mappedDocs.keySet().iterator();
- while (iter.hasNext())
+
+ OutputRemoveActivity logger = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+
+ try
{
- String connectionName = (String)iter.next();
- ArrayList list = (ArrayList)mappedDocs.get(connectionName);
-
- // Segregate by output connection as well.
- HashMap outputMap = new HashMap();
+ ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,logger);
j = 0;
- while (j < list.size())
+ while (j < dds.getCount())
{
- DeleteQueuedDocument dqd = (DeleteQueuedDocument)list.get(j);
- DocumentDescription ddd = dqd.getDocumentDescription();
- Long jobID = ddd.getJobID();
- IJobDescription job = jobManager.load(jobID,true);
- String outputConnectionName = job.getOutputConnectionName();
-
- ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
- if (subList == null)
- {
- subList = new ArrayList();
- outputMap.put(outputConnectionName,subList);
- }
- subList.add(new Integer(j));
+ deleteFromQueue[j] = true;
j++;
}
-
- // Now, cycle through all the output connections
- Iterator outputIterator = outputMap.keySet().iterator();
- while (outputIterator.hasNext())
+ }
+ catch (ServiceInterruption e)
+ {
+ // We don't know which failed, or maybe they all did.
+ // Go through the list of documents we just tried, and reset them on the queue based on the
+ // ServiceInterruption parameters. Then we must proceed to delete ONLY the documents that
+ // were not part of the index deletion attempt.
+ j = 0;
+ while (j < dds.getCount())
{
- String outputConnectionName = (String)outputIterator.next();
- ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
-
- String[] docClassesToRemove = new String[subList.size()];
- String[] hashedDocsToRemove = new String[subList.size()];
- DeleteQueuedDocument[] docsToDelete = new DeleteQueuedDocument[subList.size()];
- j = 0;
- while (j < subList.size())
- {
- int index = ((Integer)subList.get(j)).intValue();
- DeleteQueuedDocument dqd = (DeleteQueuedDocument)list.get(index);
- DocumentDescription ddd = dqd.getDocumentDescription();
- Long jobID = ddd.getJobID();
- IJobDescription job = jobManager.load(jobID,true);
- docClassesToRemove[j] = connectionName;
- hashedDocsToRemove[j] = ddd.getDocumentIdentifierHash();
- docsToDelete[j] = dqd;
- j++;
- }
- OutputRemoveActivity logger = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
- while (true)
- {
- try
- {
- ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,logger);
- break;
- }
- catch (ServiceInterruption e)
- {
- // No document deletions can take place while the ingestion API is down, so simply wait for it to come back up. There's
- // nothing better for this thread to be doing...
- // Wait for the prescribed time
- long amt = e.getRetryTime();
- long now = System.currentTimeMillis();
- long waittime = amt-now;
- if (waittime <= 0L)
- waittime = 300000L;
- ManifoldCF.sleep(waittime);
- }
- }
-
- // Delete the records
- DocumentDescription[] deleteDescriptions = new DocumentDescription[docsToDelete.length];
- j = 0;
- while (j < deleteDescriptions.length)
- {
- deleteDescriptions[j] = docsToDelete[j].getDocumentDescription();
- j++;
- }
- jobManager.deleteIngestedDocumentIdentifiers(deleteDescriptions);
- // Mark them as gone
- j = 0;
- while (j < docsToDelete.length)
- {
- docsToDelete[j++].wasProcessed();
- }
+ DeleteQueuedDocument cqd = docsToDelete[j];
+ DocumentDescription dd = cqd.getDocumentDescription();
+ // To recover from an expiration failure, requeue the document to COMPLETED etc.
+ jobManager.resetDeletingDocument(dd,e.getRetryTime());
+ cqd.setProcessed();
+ j++;
}
}
+ // Count the records we're actually going to delete
+ int recordCount = 0;
+ j = 0;
+ while (j < dds.getCount())
+ {
+ if (deleteFromQueue[j])
+ recordCount++;
+ j++;
+ }
+
+ // Delete the records
+ DocumentDescription[] deleteDescriptions = new DocumentDescription[recordCount];
+ j = 0;
+ recordCount = 0;
+ while (j < dds.getCount())
+ {
+ if (deleteFromQueue[j])
+ deleteDescriptions[recordCount++] = docsToDelete[j].getDocumentDescription();
+ j++;
+ }
+ jobManager.deleteIngestedDocumentIdentifiers(deleteDescriptions);
+ // Mark them as gone
+ j = 0;
+ while (j < dds.getCount())
+ {
+ if (deleteFromQueue[j])
+ docsToDelete[j].wasProcessed();
+ j++;
+ }
// Go around again
}
finally
@@ -226,7 +196,7 @@ public class DocumentDeleteThread extend
// Pop this document back into the jobqueue in an appropriate state
DocumentDescription ddd = dqd.getDocumentDescription();
// Requeue this document!
- jobManager.resetDeletingDocument(ddd);
+ jobManager.resetDeletingDocument(ddd,0L);
dqd.setProcessed();
}
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java Fri Jan 14 02:21:53 2011
@@ -135,19 +135,41 @@ public class ExpireStufferThread extends
continue;
}
- // Do the stuffing
- CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[descs.length];
+ // Do the stuffing. Each set must be segregated by job, since we need the job ID in the doc set.
+ Map jobMap = new HashMap();
int k = 0;
- while (k < docDescs.length)
+ while (k < descs.length)
{
- docDescs[k] = new CleanupQueuedDocument(descs[k],deleteFromIndex[k]);
+ CleanupQueuedDocument x = new CleanupQueuedDocument(descs[k],deleteFromIndex[k]);
+ Long jobID = descs[k].getJobID();
+ List y = (List)jobMap.get(jobID);
+ if (y == null)
+ {
+ y = new ArrayList();
+ jobMap.put(jobID,y);
+ }
+ y.add(x);
k++;
}
- DocumentCleanupSet set = new DocumentCleanupSet(docDescs);
- documentQueue.addDocuments(set);
+
+ Iterator iter = jobMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ Long jobID = (Long)iter.next();
+ IJobDescription jobDescription = jobManager.load(jobID,true);
+ List y = (List)jobMap.get(jobID);
+ CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[y.size()];
+ k = 0;
+ while (k < docDescs.length)
+ {
+ docDescs[k] = (CleanupQueuedDocument)y.get(k);
+ k++;
+ }
+ DocumentCleanupSet set = new DocumentCleanupSet(docDescs,jobDescription);
+ documentQueue.addDocuments(set);
+ }
- // If we don't wait here, the other threads don't seem to have a chance to queue anything else up.
- ManifoldCF.sleep(1000L);
+ yield();
}
catch (ManifoldCFException e)
{
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Fri Jan 14 02:21:53 2011
@@ -94,197 +94,177 @@ public class ExpireThread extends Thread
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+ IJobDescription job = dds.getJobDescription();
+ String connectionName = job.getConnectionName();
+ String outputConnectionName = job.getOutputConnectionName();
+
try
{
long currentTime = System.currentTimeMillis();
- // We need to segregate all the documents by connection, in order to be able to form a decent activities object
- // to pass into the incremental ingester. So, first pass through the document descriptions will build that.
- Map mappedDocs = new HashMap();
+ // Documents will be naturally segregated by connection, since each set comes from a single job.
+
+ // Produce a map of connection name->connection object. We will use this to perform a request for multiple connector objects
+ IRepositoryConnection connection = connMgr.load(connectionName);
+
+ // This is where we store the hopcount cleanup data
+ ArrayList arrayDocHashes = new ArrayList();
+ ArrayList arrayDocsToDelete = new ArrayList();
+ ArrayList arrayRelationshipTypes = new ArrayList();
+ ArrayList hopcountMethods = new ArrayList();
+
int j = 0;
while (j < dds.getCount())
{
- CleanupQueuedDocument dqd = dds.getDocument(j++);
+ CleanupQueuedDocument dqd = dds.getDocument(j);
DocumentDescription ddd = dqd.getDocumentDescription();
- Long jobID = ddd.getJobID();
- IJobDescription job = jobManager.load(jobID,true);
- String connectionName = job.getConnectionName();
- ArrayList list = (ArrayList)mappedDocs.get(connectionName);
- if (list == null)
+ if (job != null && connection != null)
{
- list = new ArrayList();
- mappedDocs.put(connectionName,list);
+ // We'll need the legal link types; grab those before we proceed
+ String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
+ if (legalLinkTypes != null)
+ {
+ arrayDocHashes.add(ddd.getDocumentIdentifierHash());
+ arrayDocsToDelete.add(dqd);
+ arrayRelationshipTypes.add(legalLinkTypes);
+ hopcountMethods.add(new Integer(job.getHopcountMode()));
+ }
}
- list.add(dqd);
+ j++;
}
- // Now, cycle through all represented connections.
- // For each connection, construct the necessary pieces to do the deletion.
- Iterator iter = mappedDocs.keySet().iterator();
- while (iter.hasNext())
+ // Grab one connection for the connectionName. If we fail, nothing is lost and retries are possible.
+ try
{
- String connectionName = (String)iter.next();
- ArrayList list = (ArrayList)mappedDocs.get(connectionName);
-
- // Produce a map of connection name->connection object. We will use this to perform a request for multiple connector objects
- IRepositoryConnection connection = connMgr.load(connectionName);
- ArrayList arrayOutputConnectionNames = new ArrayList();
- ArrayList arrayDocHashes = new ArrayList();
- ArrayList arrayDocClasses = new ArrayList();
- ArrayList arrayDocsToDelete = new ArrayList();
- ArrayList arrayRelationshipTypes = new ArrayList();
- ArrayList hopcountMethods = new ArrayList();
- ArrayList connections = new ArrayList();
- j = 0;
- while (j < list.size())
+ IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+ try
{
- CleanupQueuedDocument dqd = (CleanupQueuedDocument)list.get(j);
- DocumentDescription ddd = dqd.getDocumentDescription();
- Long jobID = ddd.getJobID();
- IJobDescription job = jobManager.load(jobID,true);
- if (job != null && connection != null)
+
+ // Iterate over the outputs
+ boolean[] deleteFromQueue = new boolean[arrayDocHashes.size()];
+
+ // Count the number of docs to actually delete. This will be a subset of the documents in the list.
+ int k = 0;
+ int removeCount = 0;
+ while (k < arrayDocHashes.size())
{
- // We'll need the legal link types; grab those before we proceed
- String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
- if (legalLinkTypes != null)
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
{
- arrayOutputConnectionNames.add(job.getOutputConnectionName());
- arrayDocClasses.add(connectionName);
- arrayDocHashes.add(ddd.getDocumentIdentifierHash());
- arrayDocsToDelete.add(dqd);
- arrayRelationshipTypes.add(legalLinkTypes);
- hopcountMethods.add(new Integer(job.getHopcountMode()));
+ deleteFromQueue[k] = false;
+ removeCount++;
}
+ else
+ deleteFromQueue[k] = true;
+ k++;
}
- j++;
- }
-
- // Next, segregate the documents by output connection name. This will permit logging to know what actual activity type to use.
- HashMap outputMap = new HashMap();
- j = 0;
- while (j < arrayDocHashes.size())
- {
- String outputConnectionName = (String)arrayOutputConnectionNames.get(j);
- ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
- if (subList == null)
+
+ // Allocate removal arrays
+ String[] docClassesToRemove = new String[removeCount];
+ String[] hashedDocsToRemove = new String[removeCount];
+
+ // Now, iterate over the list
+ k = 0;
+ removeCount = 0;
+ while (k < arrayDocHashes.size())
{
- subList = new ArrayList();
- outputMap.put(outputConnectionName,subList);
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ {
+ docClassesToRemove[removeCount] = connectionName;
+ hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(k);
+ removeCount++;
+ }
+ k++;
}
- subList.add(new Integer(j));
- j++;
- }
- // Grab one connection for each connectionName. If we fail, nothing is lost and retries are possible.
- try
- {
- IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+ OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+
+ // Finally, go ahead and delete the documents from the ingestion system.
+ // If we fail, we need to put the documents back on the queue.
try
{
-
- // Iterate over the outputs
- Iterator outputIterator = outputMap.keySet().iterator();
- while (outputIterator.hasNext())
+ ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+ // Success! Label all these as needing deletion from queue.
+ k = 0;
+ while (k < arrayDocHashes.size())
{
- String outputConnectionName = (String)outputIterator.next();
- ArrayList indexList = (ArrayList)outputMap.get(outputConnectionName);
- // Count the number of docs to actually delete. This will be a subset of the documents in the list.
- int k = 0;
- int removeCount = 0;
- while (k < indexList.size())
- {
- int index = ((Integer)indexList.get(k++)).intValue();
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
- removeCount++;
- }
-
- // Allocate removal arrays
- String[] docClassesToRemove = new String[removeCount];
- String[] hashedDocsToRemove = new String[removeCount];
-
- // Now, iterate over the index list
- k = 0;
- removeCount = 0;
- while (k < indexList.size())
- {
- int index = ((Integer)indexList.get(k)).intValue();
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
- {
- docClassesToRemove[removeCount] = (String)arrayDocClasses.get(index);
- hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(index);
- removeCount++;
- }
- k++;
- }
-
- OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
-
- // Finally, go ahead and delete the documents from the ingestion system.
-
- while (true)
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ deleteFromQueue[k] = true;
+ k++;
+ }
+ }
+ catch (ServiceInterruption e)
+ {
+ // We don't know which failed, or maybe they all did.
+ // Go through the list of documents we just tried, and reset them on the queue based on the
+ // ServiceInterruption parameters. Then we must proceed to delete ONLY the documents that
+ // were not part of the index deletion attempt.
+ k = 0;
+ while (k < arrayDocHashes.size())
+ {
+ CleanupQueuedDocument cqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+ if (cqd.shouldBeRemovedFromIndex())
{
- try
+ DocumentDescription dd = cqd.getDocumentDescription();
+ if (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
+ dd.getFailRetryCount() == 0)
{
- ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
- break;
+ // Treat this as a hard failure.
+ if (e.isAbortOnFail())
+ throw new ManifoldCFException("Repeated service interruptions - failure expiring document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
}
- catch (ServiceInterruption e)
+ else
{
- // If we get a service interruption here, it means that the ingestion API is down.
- // There is no point, therefore, in freeing up this thread to go do something else;
- // might as well just wait here for our retries.
- // Wait for the prescribed time
- long amt = e.getRetryTime();
- long now = System.currentTimeMillis();
- long waittime = amt-now;
- if (waittime <= 0L)
- waittime = 300000L;
- ManifoldCF.sleep(waittime);
+ // To recover from an expiration failure, requeue the document to PENDING etc.
+ jobManager.resetDocument(dd,e.getRetryTime(),
+ IJobManager.ACTION_REMOVE,e.getFailTime(),e.getFailRetryCount());
+ cqd.setProcessed();
}
}
-
- // Successfully deleted some documents from ingestion system. Now, remove them from job queue. This
- // must currently happen one document at a time, because the jobs and connectors for each document
- // potentially differ.
- k = 0;
- while (k < indexList.size())
- {
- int index = ((Integer)indexList.get(k)).intValue();
-
- CleanupQueuedDocument dqd = (CleanupQueuedDocument)arrayDocsToDelete.get(index);
- DocumentDescription ddd = dqd.getDocumentDescription();
- Long jobID = ddd.getJobID();
- int hopcountMethod = ((Integer)hopcountMethods.get(index)).intValue();
- String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(index);
- DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
- // Use the common method for doing the requeuing
- ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
- connector,connection,queueTracker,currentTime);
- // Finally, completed expiration of the document.
- dqd.setProcessed();
- k++;
- }
+ k++;
}
}
- finally
+
+ // Successfully deleted some documents from ingestion system. Now, remove them from job queue. This
+ // must currently happen one document at a time, because the jobs and connectors for each document
+ // potentially differ.
+ k = 0;
+ while (k < arrayDocHashes.size())
{
- // Free up the reserved connector instance
- RepositoryConnectorFactory.release(connector);
+ if (deleteFromQueue[k])
+ {
+ CleanupQueuedDocument dqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+ DocumentDescription ddd = dqd.getDocumentDescription();
+ Long jobID = ddd.getJobID();
+ int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
+ String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
+ DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+ // Use the common method for doing the requeuing
+ ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
+ connector,connection,queueTracker,currentTime);
+ // Finally, completed expiration of the document.
+ dqd.setProcessed();
+ }
+ k++;
}
}
- catch (ManifoldCFException e)
+ finally
{
- if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
- {
- // This error can only come from grabbing the connections. So, if this occurs it means that
- // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
- Logging.threads.warn("Expire thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
+ // Free up the reserved connector instance
+ RepositoryConnectorFactory.release(connector);
+ }
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
+ {
+ // This error can only come from grabbing the connections. So, if this occurs it means that
+ // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
+ Logging.threads.warn("Expire thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
- // Let the unprocessed documents get requeued! This is handled at the end of the loop...
- }
- else
- throw e;
+ // Let the unprocessed documents get requeued! This is handled at the end of the loop...
}
+ else
+ throw e;
}
}
catch (ManifoldCFException e)
@@ -295,7 +275,15 @@ public class ExpireThread extends Thread
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
throw e;
- Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+ // It's ok to abort a job because we can't talk to the search engine.
+ if (jobManager.errorAbort(dds.getJobDescription().getID(),e.getMessage()))
+ {
+ // We eat the exception if there was already one recorded.
+
+ // An exception occurred in the processing of a set of documents.
+ // Shut the corresponding job down, with an appropriate error
+ Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+ }
}
finally
{