You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/10 03:35:42 UTC
svn commit: r1057076 [1/2] - in /incubator/lcf/trunk: ./
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/
framework/pull-agent/src/main/java/org/apache/m...
Author: kwright
Date: Mon Jan 10 02:35:42 2011
New Revision: 1057076
URL: http://svn.apache.org/viewvc?rev=1057076&view=rev
Log:
Fix for CONNECTORS-146. Introduce a new flow for documents being cleaned up, as opposed to being deleted as part of a job's deletion. This furthermore allows us to fix a problem with carrydown data being affected by document cleanup.
Added:
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java (with props)
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java (with props)
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java (with props)
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java (with props)
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java (with props)
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java (with props)
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (with props)
Modified:
incubator/lcf/trunk/CHANGES.txt
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
Modified: incubator/lcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/CHANGES.txt?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/CHANGES.txt (original)
+++ incubator/lcf/trunk/CHANGES.txt Mon Jan 10 02:35:42 2011
@@ -14,6 +14,10 @@ forget the index state for an output con
======================= Release 0.1 =======================
Release Date: See http://incubator.apache.org/connectors for the official release date.
+CONNECTORS-146: Problem with document cleanup logic would cause data corruption
+in carrydown data and in hopcount information.
+(Karl Wright)
+
CONNECTORS-143: Copyright notice needs to be changed to 2011.
(Sebb, Karl Wright)
Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,49 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.interfaces;
+
+/** This class describes a set of documents and an associated boolean flag for each.
+*/
+public class DocumentSetAndFlags
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ protected DocumentDescription[] documentSet;
+ protected boolean[] flags;
+
+ /** Constructor. */
+ public DocumentSetAndFlags(DocumentDescription[] documentSet, boolean[] flags)
+ {
+ this.documentSet = documentSet;
+ this.flags = flags;
+ }
+
+ /** Get the document set. */
+ public DocumentDescription[] getDocumentSet()
+ {
+ return documentSet;
+ }
+
+ /** Get the flags */
+ public boolean[] getFlags()
+ {
+ return flags;
+ }
+
+}
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
------------------------------------------------------------------------------
svn:keywords = Id
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Jan 10 02:35:42 2011
@@ -3,7 +3,7 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
+* this work for additional information regarding copyright ownership.f
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -158,6 +158,11 @@ public interface IJobManager
public void resetDocDeleteWorkerStatus()
throws ManifoldCFException;
+ /** Reset as part of restoring doc cleanup threads.
+ */
+ public void resetDocCleanupWorkerStatus()
+ throws ManifoldCFException;
+
/** Reset as part of restoring startup threads.
*/
public void resetStartupWorkerStatus()
@@ -365,6 +370,23 @@ public interface IJobManager
public void resetDeletingDocument(DocumentDescription documentDescription)
throws ManifoldCFException;
+ /** Reset a cleaning document back to its former state.
+ * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+ */
+ public void resetCleaningDocument(DocumentDescription documentDescription)
+ throws ManifoldCFException;
+
+ /** Reset a set of cleaning documents for further processing in the future.
+ * This method is called after some unknown number of the documents were cleaned, but then an ingestion service interruption occurred.
+ * Note well: The logic here basically presumes that we cannot know whether the documents were indeed cleaned or not.
+ * If we knew for a fact that none of the documents had been handled, it would be possible to look at the document's
+ * current status and decide what the new status ought to be, based on a true rollback scenario. Such cases, however, are rare enough so that
+ * special logic is probably not worth it.
+ *@param documentDescriptions is the set of description objects for the document that was cleaned.
+ */
+ public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+ throws ManifoldCFException;
+
/** Add an initial set of documents to the queue.
* This method is called during job startup, when the queue is being loaded.
* A set of document references is passed to this method, which updates the status of the document
@@ -727,6 +749,14 @@ public interface IJobManager
public DocumentDescription[] getNextDeletableDocuments(int n)
throws ManifoldCFException;
+ /** Get list of cleanable document descriptions. This list will take into account
+ * multiple jobs that may own the same document.
+ *@param n is the maximum number of documents to return.
+ *@return the document descriptions for these documents.
+ */
+ public DocumentSetAndFlags getNextCleanableDocuments(int n)
+ throws ManifoldCFException;
+
/** Delete ingested document identifiers (as part of deleting the owning job).
* The number of identifiers specified is guaranteed to be less than the maxInClauseCount
* for the database.
@@ -755,8 +785,12 @@ public interface IJobManager
public void finishJobs()
throws ManifoldCFException;
- /** Reset eligible jobs back to "inactive" state. This method is used to pick up all jobs in the shutting down state
- * whose purgatory records have been all cleaned up.
+ /** Reset eligible jobs either back to the "inactive" state, or make them active again. The
+ * latter will occur if the cleanup phase of the job generated more pending documents.
+ *
+ * This method is used to pick up all jobs in the shutting down state
+ * whose purgatory or being-cleaned records have been all processed.
+ *
*@param currentTime is the current time in milliseconds since epoch.
*@param resetJobs is filled in with the set of IJobDescription objects that were reset.
*/
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java Mon Jan 10 02:35:42 2011
@@ -506,7 +506,7 @@ public class HopCount extends org.apache
}
/** Remove a set of document identifiers specified as a criteria. This will remove hopcount rows and
- * also intrinsic links that have the specified document identifiers are sources.
+ * also intrinsic links that have the specified document identifiers as sources.
*/
public void deleteMatchingDocuments(Long jobID, String[] legalLinkTypes,
String sourceTableName,
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Jan 10 02:35:42 2011
@@ -740,6 +740,16 @@ public class JobManager implements IJobM
Logging.jobs.debug("Reset complete");
}
+ /** Reset as part of restoring doc cleanup threads.
+ */
+ public void resetDocCleanupWorkerStatus()
+ throws ManifoldCFException
+ {
+ Logging.jobs.debug("Resetting doc cleaning status");
+ jobQueue.resetDocCleanupWorkerStatus();
+ Logging.jobs.debug("Reset complete");
+ }
+
/** Reset as part of restoring startup threads.
*/
public void resetStartupWorkerStatus()
@@ -765,6 +775,198 @@ public class JobManager implements IJobM
// carrydown records get removed when the job itself is removed.
}
+ /** Get list of cleanable document descriptions. This list will take into account
+ * multiple jobs that may own the same document. All documents for which a description
+ * is returned will be transitioned to the "beingcleaned" state. Documents which are
+ * not in transition and are eligible, but are owned by other jobs, will have their
+ * jobqueue entries deleted by this method.
+ *@param maxCount is the maximum number of documents to return.
+ *@return the document descriptions for these documents.
+ */
+ public DocumentSetAndFlags getNextCleanableDocuments(int maxCount)
+ throws ManifoldCFException
+ {
+ // The query will be built here, because it joins the jobs table against the jobqueue
+ // table.
+ //
+ // This query must only pick up documents that are not active in any job and
+ // which belong to a job that's in a "shutting down" state and are in
+ // a "purgatory" state.
+ //
+ // We are in fact more conservative in this query than we need to be; the documents
+ // excluded will include some that simply match our criteria, which is designed to
+ // be fast rather than perfect. The match we make is: hashvalue against hashvalue, and
+ // different job id's.
+ //
+ // SELECT id,jobid,docid FROM jobqueue t0 WHERE t0.status='P' AND EXISTS(SELECT 'x' FROM
+ // jobs t3 WHERE t0.jobid=t3.id AND t3.status='X')
+ // AND NOT EXISTS(SELECT 'x' FROM jobqueue t2 WHERE t0.hashval=t2.hashval AND t0.jobid!=t2.jobid
+ // AND t2.status IN ('A','F','B'))
+ //
+
+ // Do a simple preliminary query, since the big query is currently slow, so that we don't waste time during stasis or
+ // ingestion.
+ // Moved outside of transaction, so we have no chance of locking up job status cache key for an extended period of time.
+ if (!jobs.cleaningJobsPresent())
+ return new DocumentSetAndFlags(new DocumentDescription[0],new boolean[0]);
+
+ long startTime = 0L;
+ if (Logging.perf.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ Logging.perf.debug("Waiting to find documents to put on the cleaning queue");
+ }
+
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on cleaning queue");
+
+ // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being cleaned".
+ ArrayList list = new ArrayList();
+ list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
+
+ list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
+
+ list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+
+ IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
+ jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
+ jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+"=? "+
+ " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
+ " AND t1."+jobs.statusField+"=?"+
+ ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
+ jobQueue.docHashField+" AND t0."+jobQueue.jobIDField+"!=t2."+jobQueue.jobIDField+
+ " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
+ list,null,null,maxCount,null);
+
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+ // We need to organize the returned set by connection name, so that we can efficiently
+ // use getUnindexableDocumentIdentifiers.
+ // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
+ // objects.
+ HashMap connectionNameMap = new HashMap();
+ HashMap documentIDMap = new HashMap();
+ int i = 0;
+ while (i < set.getRowCount())
+ {
+ IResultRow row = set.getRow(i);
+ Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+ String documentIDHash = (String)row.getValue(jobQueue.docHashField);
+ String documentID = (String)row.getValue(jobQueue.docIDField);
+ Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
+ Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+ // Failtime is probably not useful in this context, but we'll bring it along for completeness
+ long failTime;
+ if (failTimeValue == null)
+ failTime = -1L;
+ else
+ failTime = failTimeValue.longValue();
+ int failCount;
+ if (failCountValue == null)
+ failCount = 0;
+ else
+ failCount = (int)failCountValue.longValue();
+ IJobDescription jobDesc = load(jobID);
+ String connectionName = jobDesc.getConnectionName();
+ DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+ jobID,documentIDHash,documentID,failTime,failCount);
+ documentIDMap.put(documentIDHash,dd);
+ ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+ if (x == null)
+ {
+ // New entry needed
+ x = new ArrayList();
+ connectionNameMap.put(connectionName,x);
+ }
+ x.add(dd);
+ i++;
+ }
+
+ // For each bin, obtain a filtered answer, and enter all answers into a hash table.
+ // We'll then scan the result again to look up the right descriptions for return,
+ // and delete the ones that are owned multiply.
+ HashMap allowedDocIds = new HashMap();
+ Iterator iter = connectionNameMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String connectionName = (String)iter.next();
+ ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+ // Do the filter query
+ DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+ int j = 0;
+ while (j < descriptions.length)
+ {
+ descriptions[j] = (DocumentDescription)x.get(j);
+ j++;
+ }
+ String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName);
+ j = 0;
+ while (j < docIDHashes.length)
+ {
+ String docIDHash = docIDHashes[j++];
+ allowedDocIds.put(docIDHash,docIDHash);
+ }
+ }
+
+ // Now, assemble a result, and change the state of the records accordingly
+ DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+ boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+ i = 0;
+ iter = documentIDMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String docIDHash = (String)iter.next();
+ DocumentDescription dd = (DocumentDescription)documentIDMap.get(docIDHash);
+ // Determine whether we can delete it from the index or not
+ rvalBoolean[i] = (allowedDocIds.get(docIDHash) != null);
+ // Set the record status to "being cleaned" and return it
+ rval[i++] = dd;
+ jobQueue.setCleaningStatus(dd.getID());
+ }
+
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+ return new DocumentSetAndFlags(rval,rvalBoolean);
+
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+
/** Get list of deletable document descriptions. This list will take into account
* multiple jobs that may own the same document. All documents for which a description
* is returned will be transitioned to the "beingdeleted" state. Documents which are
@@ -789,9 +991,8 @@ public class JobManager implements IJobM
// be fast rather than perfect. The match we make is: hashvalue against hashvalue, and
// different job id's.
//
- // SELECT id,jobid,docid FROM jobqueue t0 WHERE ((t0.status IN ('C','P','G') AND EXISTS(SELECT 'x' FROM
- // jobs t1 WHERE t0.jobid=t1.id AND t1.status='D')) OR (t0.status='P' AND EXISTS(SELECT 'x' FROM
- // jobs t3 WHERE t0.jobid=t3.id AND t3.status='X')))
+ // SELECT id,jobid,docid FROM jobqueue t0 WHERE (t0.status IN ('C','P','G') AND EXISTS(SELECT 'x' FROM
+ // jobs t1 WHERE t0.jobid=t1.id AND t1.status='D')
// AND NOT EXISTS(SELECT 'x' FROM jobqueue t2 WHERE t0.hashval=t2.hashval AND t0.jobid!=t2.jobid
// AND t2.status IN ('A','F','B'))
//
@@ -827,27 +1028,21 @@ public class JobManager implements IJobM
list.add(jobs.statusToString(jobs.STATUS_READYFORDELETE));
- list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
-
- list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
-
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
- jobQueue.getTableName()+" t0 WHERE ((t0."+jobQueue.statusField+" IN (?,?,?) "+
+ jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+" IN (?,?,?) "+
" AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
" AND t1."+jobs.statusField+"=?"+
- ")) OR (t0."+jobQueue.statusField+"=?"+
- " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t3 WHERE t0."+jobQueue.jobIDField+"=t3."+jobs.idField+
- " AND t3."+jobs.statusField+"=?"+
- "))) AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
+ ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
jobQueue.docHashField+" AND t0."+jobQueue.jobIDField+"!=t2."+jobQueue.jobIDField+
- " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
+ " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
list,null,null,maxCount,null);
if (Logging.perf.isDebugEnabled())
@@ -1329,6 +1524,7 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
StringBuffer sb = new StringBuffer("SELECT t0.");
sb.append(jobQueue.idField).append(",t0.");
@@ -1346,7 +1542,7 @@ public class JobManager implements IJobM
sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE t0.")
.append(jobQueue.docHashField).append("=t2.").append(jobQueue.docHashField).append(" AND t0.")
.append(jobQueue.jobIDField).append("!=t2.").append(jobQueue.jobIDField).append(" AND t2.")
- .append(jobQueue.statusField).append(" IN (?,?,?,?,?))");
+ .append(jobQueue.statusField).append(" IN (?,?,?,?,?,?))");
sb.append(" ").append(database.constructOffsetLimitClause(0,n));
// Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
@@ -1775,6 +1971,7 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
sb.append("t0.").append(jobQueue.checkTimeField).append("<=? AND ");
sb.append("(t0.").append(jobQueue.checkActionField).append(" IS NULL OR t0.").append(jobQueue.checkActionField)
@@ -1785,7 +1982,7 @@ public class JobManager implements IJobM
sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE t0.")
.append(jobQueue.docHashField).append("=t2.").append(jobQueue.docHashField).append(" AND t0.")
.append(jobQueue.jobIDField).append("!=t2.").append(jobQueue.jobIDField).append(" AND t2.")
- .append(jobQueue.statusField).append(" IN (?,?,?,?,?)) AND ");
+ .append(jobQueue.statusField).append(" IN (?,?,?,?,?,?)) AND ");
// Prerequisite event clause: AND NOT EXISTS(SELECT 'x' FROM prereqevents t3,events t4 WHERE t3.ownerid=t0.id AND t3.name=t4.name)
sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.prereqEventManager.getTableName()).append(" t3,").append(eventManager.getTableName()).append(" t4 WHERE t0.")
@@ -2487,6 +2684,97 @@ public class JobManager implements IJobM
}
}
+ /** Reset a set of cleaning documents for further processing in the future.
+ * This method is called after some unknown number of the documents were cleaned, but then an ingestion service interruption occurred.
+ * Note well: The logic here basically presumes that we cannot know whether the documents were indeed cleaned or not.
+ * If we knew for a fact that none of the documents had been handled, it would be possible to look at the document's
+ * current status and decide what the new status ought to be, based on a true rollback scenario. Such cases, however, are rare enough so that
+ * special logic is probably not worth it.
+ *@param documentDescriptions is the set of description objects for the document that was cleaned.
+ */
+ public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+ throws ManifoldCFException
+ {
+ Long[] ids = new Long[documentDescriptions.length];
+ String[] docIDHashes = new String[documentDescriptions.length];
+
+ // First loop maps document identifier back to an index.
+ HashMap indexMap = new HashMap();
+ int i = 0;
+ while (i < documentDescriptions.length)
+ {
+ docIDHashes[i] =documentDescriptions[i].getDocumentIdentifierHash() + ":" + documentDescriptions[i].getJobID();
+ indexMap.put(docIDHashes[i],new Integer(i));
+ i++;
+ }
+
+ // Sort!
+ java.util.Arrays.sort(docIDHashes);
+
+ // Next loop populates the actual arrays we use to feed the operation so that the ordering is correct.
+ i = 0;
+ while (i < docIDHashes.length)
+ {
+ String docIDHash = docIDHashes[i];
+ Integer x = (Integer)indexMap.remove(docIDHash);
+ if (x == null)
+ throw new ManifoldCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
+ int index = x.intValue();
+ ids[i] = documentDescriptions[index].getID();
+ i++;
+ }
+
+ // Documents get marked PURGATORY regardless of their current state; this is because we can't know at this point what the actual prior state was.
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ // Going through ids in order should greatly reduce or eliminate chances of deadlock occurring. We thus need to pay attention to the sorted order.
+ i = 0;
+ while (i < ids.length)
+ {
+ jobQueue.setUncleaningStatus(ids[i]);
+ i++;
+ }
+
+ break;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction resetting cleaning documents: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+
+ /** Reset a cleaning document back to its former state.
+ * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+ */
+ public void resetCleaningDocument(DocumentDescription documentDescription)
+ throws ManifoldCFException
+ {
+ resetCleaningDocumentMultiple(new DocumentDescription[]{documentDescription});
+ }
+
/** Reset a set of deleting documents for further processing in the future.
* This method is called after some unknown number of the documents were deleted, but then an ingestion service interruption occurred.
* Note well: The logic here basically presumes that we cannot know whether the documents were indeed processed or not.
@@ -5113,7 +5401,7 @@ public class JobManager implements IJobM
// This method must find only jobs that have nothing hanging around in their jobqueue that represents an ingested
// document. Any jobqueue entries which are in a state to interfere with the delete will be cleaned up by other
// threads, so eventually a job will become eligible. This happens when there are no records that have an ingested
- // status: complete, purgatory, being-deleted, or pending purgatory.
+ // status: complete, purgatory, being-cleaned, being-deleted, or pending purgatory.
database.beginTransaction();
try
{
@@ -5152,12 +5440,15 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
list.add(jobID);
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobID);
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
IResultSet confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
jobQueue.getTableName()+" WHERE "+
"("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
"("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
"("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
+ "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
"("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) "+database.constructOffsetLimitClause(0,1),list,null,null,1,null);
if (confirmSet.getRowCount() > 0)
@@ -5438,8 +5729,12 @@ public class JobManager implements IJobM
}
}
- /** Reset eligible jobs back to "inactive" state. This method is used to pick up all jobs in the shutting down state
- * whose purgatory or being-deleted records have been all cleaned up.
+ /** Reset eligible jobs either back to the "inactive" state, or make them active again. The
+ * latter will occur if the cleanup phase of the job generated more pending documents.
+ *
+ * This method is used to pick up all jobs in the shutting down state
+ * whose purgatory or being-cleaned records have been all processed.
+ *
*@param currentTime is the current time in milliseconds since epoch.
*@param resetJobs is filled in with the set of IJobDescription objects that were reset.
*/
@@ -5477,7 +5772,7 @@ public class JobManager implements IJobM
list.add(jobID);
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
list.add(jobID);
- list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
IResultSet confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
jobQueue.getTableName()+" WHERE "+
@@ -5487,14 +5782,40 @@ public class JobManager implements IJobM
if (confirmSet.getRowCount() > 0)
continue;
- IJobDescription jobDesc = jobs.load(jobID,true);
- resetJobs.add(jobDesc);
-
- // Label the job "finished"
- jobs.finishJob(jobID,currentTime);
- if (Logging.jobs.isDebugEnabled())
+ // The shutting-down phase is complete. However, we need to check if there are any outstanding
+ // PENDING or PENDINGPURGATORY records before we can decide what to do.
+ list.clear();
+ list.add(jobID);
+ list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
+ list.add(jobID);
+ list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
+
+ confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
+ jobQueue.getTableName()+" WHERE "+
+ "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
+ "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) "+database.constructOffsetLimitClause(0,1),list,null,null,1,null);
+
+ if (confirmSet.getRowCount() > 0)
{
- Logging.jobs.debug("Job "+jobID+" now completed");
+ // This job needs to re-enter the active state. Make that happen.
+ jobs.returnJobToActive(jobID);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Job "+jobID+" is re-entering active state");
+ }
+ }
+ else
+ {
+ // This job should be marked as finished.
+ IJobDescription jobDesc = jobs.load(jobID,true);
+ resetJobs.add(jobDesc);
+
+ // Label the job "finished"
+ jobs.finishJob(jobID,currentTime);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Job "+jobID+" now completed");
+ }
}
}
return;
@@ -5804,6 +6125,7 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
@@ -5857,6 +6179,7 @@ public class JobManager implements IJobM
.append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Processed'")
.append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Processed'")
.append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Being removed'")
+ .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Being removed'")
.append(" ELSE 'Unknown'")
.append(" END AS state,")
.append("CASE")
@@ -5982,6 +6305,7 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
list.add(jobQueue.actionToString(jobQueue.ACTION_RESCAN));
list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
@@ -6035,6 +6359,7 @@ public class JobManager implements IJobM
.append("CASE")
.append(" WHEN ")
.append(jobQueue.statusField).append("=?")
+ .append(" OR ").append(jobQueue.statusField).append("=?")
.append(" THEN 1 ELSE 0")
.append(" END")
.append(" as deleting,")
@@ -6177,9 +6502,10 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
- sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?,?,?,?,?)");
+ sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?,?,?,?,?,?)");
break;
}
k++;
@@ -6221,7 +6547,8 @@ public class JobManager implements IJobM
break;
case DOCSTATUS_DELETING:
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
- sb.append(fieldPrefix).append(jobQueue.statusField).append("=?");
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+ sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?)");
break;
case DOCSTATUS_READYFORPROCESSING:
list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Jan 10 02:35:42 2011
@@ -45,6 +45,7 @@ public class JobQueue extends org.apache
public final static int STATUS_BEINGDELETED = 6;
public final static int STATUS_ACTIVENEEDRESCAN = 7;
public final static int STATUS_ACTIVENEEDRESCANPURGATORY = 8;
+ public final static int STATUS_BEINGCLEANED = 9;
// Action values
public final static int ACTION_RESCAN = 0;
@@ -66,9 +67,12 @@ public class JobQueue extends org.apache
// an aborted job. On recovery, PENDING and ACTIVE records are deleted (since they were never
// completed), while PENDINGPURGATORY and ACTIVEPURGATORY records are retained but get marked as PURGATORY.
//
- // BEINGDELETED means that the document is queued to be cleaned up because the owning job is being
+ // BEINGDELETED means that the document is queued because the owning job is being
// deleted. It exists so that jobs that are active can avoid processing a document until the cleanup
// activity is done.
+ //
+ // BEINGCLEANED means that the document is queued because the owning job is in the SHUTTINGDOWN
+ // state, and the document was never encountered during the crawl.
// Field names
public static final String idField = "id";
@@ -98,6 +102,7 @@ public class JobQueue extends org.apache
statusMap.put("D",new Integer(STATUS_BEINGDELETED));
statusMap.put("a",new Integer(STATUS_ACTIVENEEDRESCAN));
statusMap.put("f",new Integer(STATUS_ACTIVENEEDRESCANPURGATORY));
+ statusMap.put("d",new Integer(STATUS_BEINGCLEANED));
}
protected static Map seedstatusMap;
@@ -307,6 +312,12 @@ public class JobQueue extends org.apache
list.add(statusToString(STATUS_BEINGDELETED));
performUpdate(map,"WHERE "+statusField+"=?",list,null);
+ // Map BEINGCLEANED to PURGATORY
+ map.put(statusField,statusToString(STATUS_PURGATORY));
+ list.clear();
+ list.add(statusToString(STATUS_BEINGCLEANED));
+ performUpdate(map,"WHERE "+statusField+"=?",list,null);
+
// Map newseed fields to seed
map.put(isSeedField,seedstatusToString(SEEDSTATUS_SEED));
list.clear();
@@ -376,6 +387,20 @@ public class JobQueue extends org.apache
performUpdate(map,"WHERE "+statusField+"=?",list,null);
}
+ /** Reset doc cleaning worker status.
+ */
+ public void resetDocCleanupWorkerStatus()
+ throws ManifoldCFException
+ {
+ HashMap map = new HashMap();
+ ArrayList list = new ArrayList();
+ // Map BEINGCLEANED to PURGATORY
+ map.put(statusField,statusToString(STATUS_PURGATORY));
+ list.clear();
+ list.add(statusToString(STATUS_BEINGCLEANED));
+ performUpdate(map,"WHERE "+statusField+"=?",list,null);
+ }
+
/** Prepare for a "full scan" job. This will not be called
* unless the job is in the "INACTIVE" state.
* This does the following:
@@ -642,6 +667,34 @@ public class JobQueue extends org.apache
performUpdate(map,"WHERE "+idField+"=?",list,null);
}
+ /** Set the status of a document to "being cleaned".
+ */
+ public void setCleaningStatus(Long id)
+ throws ManifoldCFException
+ {
+ ArrayList list = new ArrayList();
+ list.add(id);
+ HashMap map = new HashMap();
+ map.put(statusField,statusToString(STATUS_BEINGCLEANED));
+ performUpdate(map,"WHERE "+idField+"=?",list,null);
+ noteModifications(0,1,0);
+ }
+
+ /** Set the status of a document to be "no longer cleaning" */
+ public void setUncleaningStatus(Long id)
+ throws ManifoldCFException
+ {
+ HashMap map = new HashMap();
+ map.put(statusField,statusToString(STATUS_PURGATORY));
+ map.put(checkTimeField,null);
+ map.put(checkActionField,null);
+ map.put(failTimeField,null);
+ map.put(failCountField,null);
+ ArrayList list = new ArrayList();
+ list.add(id);
+ performUpdate(map,"WHERE "+idField+"=?",list,null);
+ }
+
/** Remove multiple records entirely.
*@param ids is the set of job queue id's
*/
@@ -715,12 +768,14 @@ public class JobQueue extends org.apache
case STATUS_ACTIVEPURGATORY:
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
+ case STATUS_BEINGCLEANED:
+ // These are all the active states. Being in this state implies that a thread may be working on the document. We
+ // must not interrupt it.
// Initial adds never bring along any carrydown info, so we should be satisfied as long as the record exists.
break;
case STATUS_COMPLETE:
case STATUS_PURGATORY:
- case STATUS_BEINGDELETED:
// Set the status and time both
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
if (desiredExecuteTime == -1L)
@@ -996,6 +1051,7 @@ public class JobQueue extends org.apache
rval = true;
break;
case STATUS_COMPLETE:
+ case STATUS_BEINGCLEANED:
// Requeue the document for processing, if there have been other changes.
if (otherChangesSeen)
{
@@ -1222,6 +1278,8 @@ public class JobQueue extends org.apache
return "a";
case STATUS_ACTIVENEEDRESCANPURGATORY:
return "f";
+ case STATUS_BEINGCLEANED:
+ return "d";
default:
throw new ManifoldCFException("Bad status value: "+Integer.toString(status));
}
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Mon Jan 10 02:35:42 2011
@@ -1115,6 +1115,67 @@ public class Jobs extends org.apache.man
performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
}
+ /** Put job back into active state, from the shutting-down state.
+ *@param jobID is the job identifier.
+ */
+ public void returnJobToActive(Long jobID)
+ throws ManifoldCFException
+ {
+ beginTransaction();
+ try
+ {
+ ArrayList list = new ArrayList();
+ list.add(jobID);
+ IResultSet set = performQuery("SELECT "+statusField+","+connectionNameField+","+outputNameField+" FROM "+getTableName()+" WHERE "+
+ idField+"=? FOR UPDATE",list,null,null);
+ if (set.getRowCount() == 0)
+ throw new ManifoldCFException("Can't find job "+jobID.toString());
+ IResultRow row = set.getRow(0);
+ int status = stringToStatus((String)row.getValue(statusField));
+ int newStatus;
+ switch (status)
+ {
+ case STATUS_SHUTTINGDOWN:
+ if (connectionMgr.checkConnectorExists((String)row.getValue(connectionNameField)))
+ {
+ if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
+ newStatus = STATUS_ACTIVE;
+ else
+ newStatus = STATUS_ACTIVE_NOOUTPUT;
+ }
+ else
+ {
+ if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
+ newStatus = STATUS_ACTIVE_UNINSTALLED;
+ else
+ newStatus = STATUS_ACTIVE_NEITHER;
+ }
+ break;
+ default:
+ // Complain!
+ throw new ManifoldCFException("Unexpected job status encountered: "+Integer.toString(status));
+ }
+
+ HashMap map = new HashMap();
+ map.put(statusField,statusToString(newStatus));
+ performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+ }
+
/** Make job active, and set the start time field.
*@param jobID is the job identifier.
*@param startTime is the current time in milliseconds from start of epoch.
@@ -1763,8 +1824,8 @@ public class Jobs extends org.apache.man
}
- /** Return true if there is a job in either the READYFORDELETE state or the
- * SHUTTINGDOWN state. (This matches the conditions for values to be returned from
+ /** Return true if there is a job in the READYFORDELETE state. (This matches the
+ * conditions for values to be returned from
* getNextDeletableDocuments).
*@return true if such jobs exist.
*/
@@ -1773,9 +1834,24 @@ public class Jobs extends org.apache.man
{
ArrayList list = new ArrayList();
list.add(statusToString(STATUS_READYFORDELETE));
+ IResultSet set = performQuery("SELECT "+idField+" FROM "+getTableName()+" WHERE "+
+ statusField+"=? "+constructOffsetLimitClause(0,1),
+ list,new StringSet(getJobStatusKey()),null,1);
+ return set.getRowCount() > 0;
+ }
+
+ /** Return true if there is a job in the
+ * SHUTTINGDOWN state. (This matches the conditions for values to be returned from
+ * getNextCleanableDocuments).
+ *@return true if such jobs exist.
+ */
+ public boolean cleaningJobsPresent()
+ throws ManifoldCFException
+ {
+ ArrayList list = new ArrayList();
list.add(statusToString(STATUS_SHUTTINGDOWN));
IResultSet set = performQuery("SELECT "+idField+" FROM "+getTableName()+" WHERE "+
- statusField+" IN (?,?) "+constructOffsetLimitClause(0,1),
+ statusField+"=? "+constructOffsetLimitClause(0,1),
list,new StringSet(getJobStatusKey()),null,1);
return set.getRowCount() > 0;
}
Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,53 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+
+/** This class represents a document that will be placed on the document cleanup queue, and will be
+* processed by a cleanup worker thread.
+*/
+public class CleanupQueuedDocument extends DeleteQueuedDocument
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Flag indicating whether we should delete the document from the index or not. */
+ protected boolean deleteFromIndex;
+
+ /** Constructor.
+ *@param documentDescription is the document description.
+ */
+ public CleanupQueuedDocument(DocumentDescription documentDescription, boolean deleteFromIndex)
+ {
+ super(documentDescription);
+ this.deleteFromIndex = deleteFromIndex;
+ }
+
+ /** Check if document should be removed from the index.
+ *@return true if it should be removed.
+ */
+ public boolean shouldBeRemovedFromIndex()
+ {
+ return deleteFromIndex;
+ }
+
+}
+
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,51 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import java.io.*;
+import java.util.*;
+
+/** Class which handles reset for cleanup thread pool (of which there's
+* typically only one member). The reset action here
+* is to move the status of documents from "???" back to "PURGATORY".
+*/
+public class DocCleanupResetManager extends ResetManager
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ protected DocumentCleanupQueue ddq;
+
+ /** Constructor. */
+ public DocCleanupResetManager(DocumentCleanupQueue ddq)
+ {
+ super();
+ this.ddq = ddq;
+ }
+
+ /** Reset */
+ protected void performResetLogic(IThreadContext tc)
+ throws ManifoldCFException
+ {
+ IJobManager jobManager = JobManagerFactory.make(tc);
+ jobManager.resetDocCleanupWorkerStatus();
+ ddq.clear();
+ }
+}
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,112 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import java.util.*;
+
+/** This class describes a cleanup document queue, which has a "stuffer" thread and many "reader" threads.
+* The queue manages thread synchronization so that (a) the "stuffer" thread blocks until queue is empty, and
+* (b) the "reader" threads block if queue is empty.
+* The objects being queued are all QueuedDocumentSet objects.
+*/
+public class DocumentCleanupQueue
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ // Since the queue has a maximum size, an ArrayList is a fine way to keep it
+ protected ArrayList queue = new ArrayList();
+
+ /** Constructor.
+ */
+ public DocumentCleanupQueue()
+ {
+ }
+
+ /** Wake up all threads waiting on this queue. This happens at the beginning of a reset.
+ */
+ public void reset()
+ {
+ synchronized (queue)
+ {
+ queue.notifyAll();
+ }
+ }
+
+ /** Clear. This is only used on reset.
+ */
+ public void clear()
+ {
+ synchronized (queue)
+ {
+ queue.clear();
+ }
+ }
+
+ /** Check if "empty".
+ *@param n is the low-water mark; if the number falls below this, then this method will return true.
+ */
+ public boolean checkIfEmpty(int n)
+ {
+ synchronized (queue)
+ {
+ if (queue.size() <= n)
+ return true;
+ }
+ return false;
+ }
+
+ /** Add a document set to the queue. This will be a set of n documents (where n is some chunk size
+ * set by experiment).
+ *@param dd is the document set.
+ */
+ public void addDocuments(DocumentCleanupSet dd)
+ {
+ synchronized (queue)
+ {
+ queue.add(dd);
+ queue.notify();
+ }
+ }
+
+ /** Pull a document set off the queue, and wait if there is
+ * nothing there.
+ *@return the document.
+ */
+ public DocumentCleanupSet getDocuments()
+ throws InterruptedException
+ {
+ synchronized (queue)
+ {
+ // If queue is empty, go to sleep
+ while (queue.size() == 0)
+ queue.wait();
+ // If we've been awakened, there's either an entry to grab, or we've been
+ // awakened because it's time to reset.
+ if (queue.size() == 0)
+ return null;
+ // If we've been awakened, there's an entry to grab
+ DocumentCleanupSet dd = (DocumentCleanupSet)queue.remove(queue.size()-1);
+ return dd;
+ }
+ }
+
+
+}
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,61 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import java.util.*;
+
+/** This class is what's actually queued for cleanup threads. It represents an array of DocumentDescription objects,
+* of an appropriate size to be a decent chunk. It will be processed by a single cleanup worker thread, in bulk.
+*/
+public class DocumentCleanupSet
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** This is the array of documents to delete. */
+ protected CleanupQueuedDocument[] documents;
+
+ /** Constructor.
+ *@param documents is the arraylist representing the documents for this chunk.
+ */
+ public DocumentCleanupSet(CleanupQueuedDocument[] documents)
+ {
+ this.documents = documents;
+ }
+
+ /** Get the number of documents.
+ *@return the number.
+ */
+ public int getCount()
+ {
+ return documents.length;
+ }
+
+ /** Get the nth document.
+ *@param index is the document number.
+ *@return the document.
+ */
+ public CleanupQueuedDocument getDocument(int index)
+ {
+ return documents[index];
+ }
+
+}
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,194 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.crawler.system.Logging;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This class looks for documents that need to be deleted (as part of an end-of-job cleanup), and
+* queues them up for the various document cleanup threads to take care of.
+* To do this, this thread performs a query which returns a chunk of results, then queues those
+* results. The individual document delete threads will be waiting on the queue.
+* Once the queue is full enough, the thread then sleeps until the delete queue is empty again.
+*/
+public class DocumentCleanupStufferThread extends Thread
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ // Local data
+ // This is a reference to the static main document queue
+ protected DocumentCleanupQueue documentCleanupQueue;
+ // This is the reset manager
+ protected DocCleanupResetManager resetManager;
+ // This is the number of entries we want to stuff at any one time.
+ int n;
+
+ /** Constructor.
+ *@param documentCleanupQueue is the document queue we'll be stuffing.
+ *@param n is the maximum number of threads that will be doing delete processing.
+ */
+ public DocumentCleanupStufferThread(DocumentCleanupQueue documentCleanupQueue, int n, DocCleanupResetManager resetManager)
+ throws ManifoldCFException
+ {
+ super();
+ this.documentCleanupQueue = documentCleanupQueue;
+ this.n = n;
+ this.resetManager = resetManager;
+ setName("Document cleanup stuffer thread");
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ resetManager.registerMe();
+
+ try
+ {
+ // Create a thread context object.
+ IThreadContext threadContext = ThreadContextFactory.make();
+ IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
+ IJobManager jobManager = JobManagerFactory.make(threadContext);
+
+ ArrayList docList = new ArrayList();
+
+ IDBInterface database = DBInterfaceFactory.make(threadContext,
+ ManifoldCF.getMasterDatabaseName(),
+ ManifoldCF.getMasterDatabaseUsername(),
+ ManifoldCF.getMasterDatabasePassword());
+
+ int deleteChunkSize = database.getMaxInClause();
+
+ // Loop
+ while (true)
+ {
+ // Do another try/catch around everything in the loop
+ try
+ {
+ resetManager.waitForReset(threadContext);
+
+ // Wait until the delete queue is "empty" (meaning that some delete threads
+ // can run out of work if we don't act).
+ if (documentCleanupQueue.checkIfEmpty(n) == false)
+ {
+ ManifoldCF.sleep(100L);
+ continue;
+ }
+
+ Logging.threads.debug("Document cleanup stuffer thread woke up");
+
+ // This method will set the status of the documents in question
+ // to "beingcleaned".
+
+ // Get a single chunk at a time (but keep going until everything is stuffed)
+ DocumentSetAndFlags documentsToClean = jobManager.getNextCleanableDocuments(deleteChunkSize);
+ DocumentDescription[] descs = documentsToClean.getDocumentSet();
+ boolean[] removeFromIndex = documentsToClean.getFlags();
+
+ // If there are no chunks at all, then we can sleep for a while.
+ // The theory is that we need to allow stuff to accumulate.
+ if (descs.length == 0)
+ {
+ Logging.threads.debug("Document cleanup stuffer thread found nothing to do");
+ ManifoldCF.sleep(1000L); // 1 second
+ continue;
+ }
+
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Document cleanup stuffer thread found "+Integer.toString(descs.length)+" documents");
+
+ // Do the stuffing
+ CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[descs.length];
+ int k = 0;
+ while (k < docDescs.length)
+ {
+ docDescs[k] = new CleanupQueuedDocument(descs[k],removeFromIndex[k]);
+ k++;
+ }
+ DocumentCleanupSet set = new DocumentCleanupSet(docDescs);
+ documentCleanupQueue.addDocuments(set);
+
+ // If we don't wait here, the other threads don't have a chance to queue anything else up.
+ yield();
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ break;
+
+ if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+ {
+ resetManager.noteEvent();
+ documentCleanupQueue.reset();
+
+ Logging.threads.error("Cleanup stuffer thread aborting and restarting due to database connection reset",e);
+ try
+ {
+ // Give the database a chance to catch up/wake up
+ ManifoldCF.sleep(10000L);
+ }
+ catch (InterruptedException se)
+ {
+ break;
+ }
+ continue;
+ }
+
+ // Log it, but keep the thread alive
+ Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+
+ if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+ {
+ // Shut the whole system down!
+ System.exit(1);
+ }
+
+ }
+ catch (InterruptedException e)
+ {
+ // We're supposed to quit
+ break;
+ }
+ catch (OutOfMemoryError e)
+ {
+ System.err.println("agents process ran out of memory - shutting down");
+ e.printStackTrace(System.err);
+ System.exit(-200);
+ }
+ catch (Throwable e)
+ {
+ // A more severe error - but stay alive
+ Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ // Severe error on initialization
+ System.err.println("agents process could not start - shutting down");
+ Logging.threads.fatal("DocumentCleanupStufferThread initialization error tossed: "+e.getMessage(),e);
+ System.exit(-300);
+ }
+
+ }
+
+}
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
------------------------------------------------------------------------------
svn:keywords = Id