You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/10 10:24:00 UTC
svn commit: r1057136 - in
/incubator/lcf/branches/release-0.1-incubating-branch: ./
framework/core/src/main/java/org/apache/manifoldcf/core/database/
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/
framework/pull-agent/src/...
Author: kwright
Date: Mon Jan 10 09:24:00 2011
New Revision: 1057136
URL: http://svn.apache.org/viewvc?rev=1057136&view=rev
Log:
Pull up fixes for tickets CONNECTORS-146 and CONNECTORS-148 into release branch.
Added:
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
- copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
- copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
- copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
- copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
- copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
- copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
- copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
Modified:
incubator/lcf/branches/release-0.1-incubating-branch/ (props changed)
incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt
incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
Propchange: incubator/lcf/branches/release-0.1-incubating-branch/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 10 09:24:00 2011
@@ -1,2 +1,2 @@
/incubator/lcf/branches/release-0.1-branch:1056045
-/incubator/lcf/trunk:1039159,1041674,1041679,1041763,1041885,1041968,1042383,1042836-1042837,1042896,1042898,1043728,1044276,1044287,1044294,1044641,1049834,1050183,1050605,1056045,1056054,1056104,1056116,1056131-1056139,1056157,1056170-1056173,1056195,1056245,1056382-1056385,1056474,1056738
+/incubator/lcf/trunk:1039159,1041674,1041679,1041763,1041885,1041968,1042383,1042836-1042837,1042896,1042898,1043728,1044276,1044287,1044294,1044641,1049834,1050183,1050605,1056045,1056054,1056104,1056116,1056131-1056139,1056157,1056170-1056173,1056195,1056245,1056382-1056385,1056474,1056738,1057076,1057120
Modified: incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt Mon Jan 10 09:24:00 2011
@@ -4,6 +4,15 @@ $Id$
======================= Release 0.1 =======================
Release Date: See http://incubator.apache.org/connectors for the official release date.
+CONNECTORS-148: Creating the database in PostgreSQL failed intermittently
+when a parameter was used for the encoding; the PostgreSQL documentation
+specifies that it must be a quoted string in that case.
+(Karl Wright)
+
+CONNECTORS-146: Problem with document cleanup logic would cause data corruption
+in carrydown data and in hopcount information.
+(Karl Wright)
+
CONNECTORS-143: Copyright notice needs to be changed to 2011.
(Sebb, Karl Wright)
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java Mon Jan 10 09:24:00 2011
@@ -531,10 +531,8 @@ public class DBInterfacePostgreSQL exten
null,null,null,true,-1,null,null);
if (set.getRowCount() == 0)
{
- params.clear();
- params.add("utf8");
- masterDatabase.executeQuery("CREATE DATABASE "+databaseName+" OWNER="+
- userName+" ENCODING=?",params,null,invalidateKeys,null,false,0,null,null);
+ masterDatabase.executeQuery("CREATE DATABASE "+databaseName+" OWNER "+
+ userName+" ENCODING 'utf8'",null,null,invalidateKeys,null,false,0,null,null);
}
}
catch (ManifoldCFException e)
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Jan 10 09:24:00 2011
@@ -3,7 +3,7 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
+* this work for additional information regarding copyright ownership.f
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -158,6 +158,11 @@ public interface IJobManager
public void resetDocDeleteWorkerStatus()
throws ManifoldCFException;
+ /** Reset as part of restoring doc cleanup threads.
+ */
+ public void resetDocCleanupWorkerStatus()
+ throws ManifoldCFException;
+
/** Reset as part of restoring startup threads.
*/
public void resetStartupWorkerStatus()
@@ -365,6 +370,23 @@ public interface IJobManager
public void resetDeletingDocument(DocumentDescription documentDescription)
throws ManifoldCFException;
+ /** Reset a cleaning document back to its former state.
+ * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+ */
+ public void resetCleaningDocument(DocumentDescription documentDescription)
+ throws ManifoldCFException;
+
+ /** Reset a set of cleaning documents for further processing in the future.
+ * This method is called after some unknown number of the documents were cleaned, but then an ingestion service interruption occurred.
+ * Note well: The logic here basically presumes that we cannot know whether the documents were indeed cleaned or not.
+ * If we knew for a fact that none of the documents had been handled, it would be possible to look at the document's
+ * current status and decide what the new status ought to be, based on a true rollback scenario. Such cases, however, are rare enough so that
+ * special logic is probably not worth it.
+ *@param documentDescriptions is the set of description objects for the document that was cleaned.
+ */
+ public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+ throws ManifoldCFException;
+
/** Add an initial set of documents to the queue.
* This method is called during job startup, when the queue is being loaded.
* A set of document references is passed to this method, which updates the status of the document
@@ -727,6 +749,14 @@ public interface IJobManager
public DocumentDescription[] getNextDeletableDocuments(int n)
throws ManifoldCFException;
+ /** Get list of cleanable document descriptions. This list will take into account
+ * multiple jobs that may own the same document.
+ *@param n is the maximum number of documents to return.
+ *@return the document descriptions for these documents.
+ */
+ public DocumentSetAndFlags getNextCleanableDocuments(int n)
+ throws ManifoldCFException;
+
/** Delete ingested document identifiers (as part of deleting the owning job).
* The number of identifiers specified is guaranteed to be less than the maxInClauseCount
* for the database.
@@ -755,8 +785,12 @@ public interface IJobManager
public void finishJobs()
throws ManifoldCFException;
- /** Reset eligible jobs back to "inactive" state. This method is used to pick up all jobs in the shutting down state
- * whose purgatory records have been all cleaned up.
+ /** Reset eligible jobs either back to the "inactive" state, or make them active again. The
+ * latter will occur if the cleanup phase of the job generated more pending documents.
+ *
+ * This method is used to pick up all jobs in the shutting down state
+ * whose purgatory or being-cleaned records have been all processed.
+ *
*@param currentTime is the current time in milliseconds since epoch.
*@param resetJobs is filled in with the set of IJobDescription objects that were reset.
*/
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java Mon Jan 10 09:24:00 2011
@@ -506,7 +506,7 @@ public class HopCount extends org.apache
}
/** Remove a set of document identifiers specified as a criteria. This will remove hopcount rows and
- * also intrinsic links that have the specified document identifiers are sources.
+ * also intrinsic links that have the specified document identifiers as sources.
*/
public void deleteMatchingDocuments(Long jobID, String[] legalLinkTypes,
String sourceTableName,
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Jan 10 09:24:00 2011
@@ -740,6 +740,16 @@ public class JobManager implements IJobM
Logging.jobs.debug("Reset complete");
}
+ /** Reset as part of restoring doc cleanup threads.
+ */
+ public void resetDocCleanupWorkerStatus()
+ throws ManifoldCFException
+ {
+ Logging.jobs.debug("Resetting doc cleaning status");
+ jobQueue.resetDocCleanupWorkerStatus();
+ Logging.jobs.debug("Reset complete");
+ }
+
/** Reset as part of restoring startup threads.
*/
public void resetStartupWorkerStatus()
@@ -765,6 +775,198 @@ public class JobManager implements IJobM
// carrydown records get removed when the job itself is removed.
}
+ /** Get list of cleanable document descriptions. This list will take into account
+ * multiple jobs that may own the same document. All documents for which a description
+ * is returned will be transitioned to the "beingcleaned" state. Documents which are
+ * not in transition and are eligible, but are owned by other jobs, will have their
+ * jobqueue entries deleted by this method.
+ *@param maxCount is the maximum number of documents to return.
+ *@return the document descriptions for these documents.
+ */
+ public DocumentSetAndFlags getNextCleanableDocuments(int maxCount)
+ throws ManifoldCFException
+ {
+ // The query will be built here, because it joins the jobs table against the jobqueue
+ // table.
+ //
+ // This query must only pick up documents that are not active in any job and
+ // which belong to a job that's in a "shutting down" state and are in
+ // a "purgatory" state.
+ //
+ // We are in fact more conservative in this query than we need to be; the documents
+ // excluded will include some that simply match our criteria, which is designed to
+ // be fast rather than perfect. The match we make is: hashvalue against hashvalue, and
+ // different job id's.
+ //
+ // SELECT id,jobid,docid FROM jobqueue t0 WHERE t0.status='P' AND EXISTS(SELECT 'x' FROM
+ // jobs t3 WHERE t0.jobid=t3.id AND t3.status='X')
+ // AND NOT EXISTS(SELECT 'x' FROM jobqueue t2 WHERE t0.hashval=t2.hashval AND t0.jobid!=t2.jobid
+ // AND t2.status IN ('A','F','B'))
+ //
+
+ // Do a simple preliminary query, since the big query is currently slow, so that we don't waste time during stasis or
+ // ingestion.
+ // Moved outside of transaction, so we have no chance of locking up job status cache key for an extended period of time.
+ if (!jobs.cleaningJobsPresent())
+ return new DocumentSetAndFlags(new DocumentDescription[0],new boolean[0]);
+
+ long startTime = 0L;
+ if (Logging.perf.isDebugEnabled())
+ {
+ startTime = System.currentTimeMillis();
+ Logging.perf.debug("Waiting to find documents to put on the cleaning queue");
+ }
+
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on cleaning queue");
+
+ // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being cleaned".
+ ArrayList list = new ArrayList();
+ list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
+
+ list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
+
+ list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+
+ IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
+ jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
+ jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+"=? "+
+ " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
+ " AND t1."+jobs.statusField+"=?"+
+ ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
+ jobQueue.docHashField+" AND t0."+jobQueue.jobIDField+"!=t2."+jobQueue.jobIDField+
+ " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
+ list,null,null,maxCount,null);
+
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+ // We need to organize the returned set by connection name, so that we can efficiently
+ // use getUnindexableDocumentIdentifiers.
+ // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
+ // objects.
+ HashMap connectionNameMap = new HashMap();
+ HashMap documentIDMap = new HashMap();
+ int i = 0;
+ while (i < set.getRowCount())
+ {
+ IResultRow row = set.getRow(i);
+ Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+ String documentIDHash = (String)row.getValue(jobQueue.docHashField);
+ String documentID = (String)row.getValue(jobQueue.docIDField);
+ Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
+ Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+ // Failtime is probably not useful in this context, but we'll bring it along for completeness
+ long failTime;
+ if (failTimeValue == null)
+ failTime = -1L;
+ else
+ failTime = failTimeValue.longValue();
+ int failCount;
+ if (failCountValue == null)
+ failCount = 0;
+ else
+ failCount = (int)failCountValue.longValue();
+ IJobDescription jobDesc = load(jobID);
+ String connectionName = jobDesc.getConnectionName();
+ DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+ jobID,documentIDHash,documentID,failTime,failCount);
+ documentIDMap.put(documentIDHash,dd);
+ ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+ if (x == null)
+ {
+ // New entry needed
+ x = new ArrayList();
+ connectionNameMap.put(connectionName,x);
+ }
+ x.add(dd);
+ i++;
+ }
+
+ // For each bin, obtain a filtered answer, and enter all answers into a hash table.
+ // We'll then scan the result again to look up the right descriptions for return,
+ // and delete the ones that are owned multiply.
+ HashMap allowedDocIds = new HashMap();
+ Iterator iter = connectionNameMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String connectionName = (String)iter.next();
+ ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+ // Do the filter query
+ DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+ int j = 0;
+ while (j < descriptions.length)
+ {
+ descriptions[j] = (DocumentDescription)x.get(j);
+ j++;
+ }
+ String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName);
+ j = 0;
+ while (j < docIDHashes.length)
+ {
+ String docIDHash = docIDHashes[j++];
+ allowedDocIds.put(docIDHash,docIDHash);
+ }
+ }
+
+ // Now, assemble a result, and change the state of the records accordingly
+ DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+ boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+ i = 0;
+ iter = documentIDMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String docIDHash = (String)iter.next();
+ DocumentDescription dd = (DocumentDescription)documentIDMap.get(docIDHash);
+ // Determine whether we can delete it from the index or not
+ rvalBoolean[i] = (allowedDocIds.get(docIDHash) != null);
+ // Set the record status to "being cleaned" and return it
+ rval[i++] = dd;
+ jobQueue.setCleaningStatus(dd.getID());
+ }
+
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+ return new DocumentSetAndFlags(rval,rvalBoolean);
+
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+
/** Get list of deletable document descriptions. This list will take into account
* multiple jobs that may own the same document. All documents for which a description
* is returned will be transitioned to the "beingdeleted" state. Documents which are
@@ -789,9 +991,8 @@ public class JobManager implements IJobM
// be fast rather than perfect. The match we make is: hashvalue against hashvalue, and
// different job id's.
//
- // SELECT id,jobid,docid FROM jobqueue t0 WHERE ((t0.status IN ('C','P','G') AND EXISTS(SELECT 'x' FROM
- // jobs t1 WHERE t0.jobid=t1.id AND t1.status='D')) OR (t0.status='P' AND EXISTS(SELECT 'x' FROM
- // jobs t3 WHERE t0.jobid=t3.id AND t3.status='X')))
+ // SELECT id,jobid,docid FROM jobqueue t0 WHERE (t0.status IN ('C','P','G') AND EXISTS(SELECT 'x' FROM
+ // jobs t1 WHERE t0.jobid=t1.id AND t1.status='D')
// AND NOT EXISTS(SELECT 'x' FROM jobqueue t2 WHERE t0.hashval=t2.hashval AND t0.jobid!=t2.jobid
// AND t2.status IN ('A','F','B'))
//
@@ -827,27 +1028,21 @@ public class JobManager implements IJobM
list.add(jobs.statusToString(jobs.STATUS_READYFORDELETE));
- list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
-
- list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
-
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
- jobQueue.getTableName()+" t0 WHERE ((t0."+jobQueue.statusField+" IN (?,?,?) "+
+ jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+" IN (?,?,?) "+
" AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
" AND t1."+jobs.statusField+"=?"+
- ")) OR (t0."+jobQueue.statusField+"=?"+
- " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t3 WHERE t0."+jobQueue.jobIDField+"=t3."+jobs.idField+
- " AND t3."+jobs.statusField+"=?"+
- "))) AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
+ ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
jobQueue.docHashField+" AND t0."+jobQueue.jobIDField+"!=t2."+jobQueue.jobIDField+
- " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
+ " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
list,null,null,maxCount,null);
if (Logging.perf.isDebugEnabled())
@@ -1329,6 +1524,7 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
StringBuffer sb = new StringBuffer("SELECT t0.");
sb.append(jobQueue.idField).append(",t0.");
@@ -1346,7 +1542,7 @@ public class JobManager implements IJobM
sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE t0.")
.append(jobQueue.docHashField).append("=t2.").append(jobQueue.docHashField).append(" AND t0.")
.append(jobQueue.jobIDField).append("!=t2.").append(jobQueue.jobIDField).append(" AND t2.")
- .append(jobQueue.statusField).append(" IN (?,?,?,?,?))");
+ .append(jobQueue.statusField).append(" IN (?,?,?,?,?,?))");
sb.append(" ").append(database.constructOffsetLimitClause(0,n));
// Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
@@ -1775,6 +1971,7 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
sb.append("t0.").append(jobQueue.checkTimeField).append("<=? AND ");
sb.append("(t0.").append(jobQueue.checkActionField).append(" IS NULL OR t0.").append(jobQueue.checkActionField)
@@ -1785,7 +1982,7 @@ public class JobManager implements IJobM
sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE t0.")
.append(jobQueue.docHashField).append("=t2.").append(jobQueue.docHashField).append(" AND t0.")
.append(jobQueue.jobIDField).append("!=t2.").append(jobQueue.jobIDField).append(" AND t2.")
- .append(jobQueue.statusField).append(" IN (?,?,?,?,?)) AND ");
+ .append(jobQueue.statusField).append(" IN (?,?,?,?,?,?)) AND ");
// Prerequisite event clause: AND NOT EXISTS(SELECT 'x' FROM prereqevents t3,events t4 WHERE t3.ownerid=t0.id AND t3.name=t4.name)
sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.prereqEventManager.getTableName()).append(" t3,").append(eventManager.getTableName()).append(" t4 WHERE t0.")
@@ -2487,6 +2684,97 @@ public class JobManager implements IJobM
}
}
+ /** Reset a set of cleaning documents for further processing in the future.
+ * This method is called after some unknown number of the documents were cleaned, but then an ingestion service interruption occurred.
+ * Note well: The logic here basically presumes that we cannot know whether the documents were indeed cleaned or not.
+ * If we knew for a fact that none of the documents had been handled, it would be possible to look at the document's
+ * current status and decide what the new status ought to be, based on a true rollback scenario. Such cases, however, are rare enough so that
+ * special logic is probably not worth it.
+ *@param documentDescriptions is the set of description objects for the document that was cleaned.
+ */
+ public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+ throws ManifoldCFException
+ {
+ Long[] ids = new Long[documentDescriptions.length];
+ String[] docIDHashes = new String[documentDescriptions.length];
+
+ // First loop maps document identifier back to an index.
+ HashMap indexMap = new HashMap();
+ int i = 0;
+ while (i < documentDescriptions.length)
+ {
+ docIDHashes[i] =documentDescriptions[i].getDocumentIdentifierHash() + ":" + documentDescriptions[i].getJobID();
+ indexMap.put(docIDHashes[i],new Integer(i));
+ i++;
+ }
+
+ // Sort!
+ java.util.Arrays.sort(docIDHashes);
+
+ // Next loop populates the actual arrays we use to feed the operation so that the ordering is correct.
+ i = 0;
+ while (i < docIDHashes.length)
+ {
+ String docIDHash = docIDHashes[i];
+ Integer x = (Integer)indexMap.remove(docIDHash);
+ if (x == null)
+ throw new ManifoldCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
+ int index = x.intValue();
+ ids[i] = documentDescriptions[index].getID();
+ i++;
+ }
+
+ // Documents get marked PURGATORY regardless of their current state; this is because we can't know at this point what the actual prior state was.
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ // Going through ids in order should greatly reduce or eliminate chances of deadlock occurring. We thus need to pay attention to the sorted order.
+ i = 0;
+ while (i < ids.length)
+ {
+ jobQueue.setUncleaningStatus(ids[i]);
+ i++;
+ }
+
+ break;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction resetting cleaning documents: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+
+ /** Reset a cleaning document back to its former state.
+ * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+ */
+ public void resetCleaningDocument(DocumentDescription documentDescription)
+ throws ManifoldCFException
+ {
+ resetCleaningDocumentMultiple(new DocumentDescription[]{documentDescription});
+ }
+
/** Reset a set of deleting documents for further processing in the future.
* This method is called after some unknown number of the documents were deleted, but then an ingestion service interruption occurred.
* Note well: The logic here basically presumes that we cannot know whether the documents were indeed processed or not.
@@ -5113,7 +5401,7 @@ public class JobManager implements IJobM
// This method must find only jobs that have nothing hanging around in their jobqueue that represents an ingested
// document. Any jobqueue entries which are in a state to interfere with the delete will be cleaned up by other
// threads, so eventually a job will become eligible. This happens when there are no records that have an ingested
- // status: complete, purgatory, being-deleted, or pending purgatory.
+ // status: complete, purgatory, being-cleaned, being-deleted, or pending purgatory.
database.beginTransaction();
try
{
@@ -5152,12 +5440,15 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
list.add(jobID);
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobID);
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
IResultSet confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
jobQueue.getTableName()+" WHERE "+
"("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
"("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
"("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
+ "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
"("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) "+database.constructOffsetLimitClause(0,1),list,null,null,1,null);
if (confirmSet.getRowCount() > 0)
@@ -5438,8 +5729,12 @@ public class JobManager implements IJobM
}
}
- /** Reset eligible jobs back to "inactive" state. This method is used to pick up all jobs in the shutting down state
- * whose purgatory or being-deleted records have been all cleaned up.
+ /** Reset eligible jobs either back to the "inactive" state, or make them active again. The
+ * latter will occur if the cleanup phase of the job generated more pending documents.
+ *
+ * This method is used to pick up all jobs in the shutting down state
+ * whose purgatory or being-cleaned records have been all processed.
+ *
*@param currentTime is the current time in milliseconds since epoch.
*@param resetJobs is filled in with the set of IJobDescription objects that were reset.
*/
@@ -5477,7 +5772,7 @@ public class JobManager implements IJobM
list.add(jobID);
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
list.add(jobID);
- list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
IResultSet confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
jobQueue.getTableName()+" WHERE "+
@@ -5487,14 +5782,40 @@ public class JobManager implements IJobM
if (confirmSet.getRowCount() > 0)
continue;
- IJobDescription jobDesc = jobs.load(jobID,true);
- resetJobs.add(jobDesc);
-
- // Label the job "finished"
- jobs.finishJob(jobID,currentTime);
- if (Logging.jobs.isDebugEnabled())
+ // The shutting-down phase is complete. However, we need to check if there are any outstanding
+ // PENDING or PENDINGPURGATORY records before we can decide what to do.
+ list.clear();
+ list.add(jobID);
+ list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
+ list.add(jobID);
+ list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
+
+ confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
+ jobQueue.getTableName()+" WHERE "+
+ "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
+ "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) "+database.constructOffsetLimitClause(0,1),list,null,null,1,null);
+
+ if (confirmSet.getRowCount() > 0)
{
- Logging.jobs.debug("Job "+jobID+" now completed");
+ // This job needs to re-enter the active state. Make that happen.
+ jobs.returnJobToActive(jobID);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Job "+jobID+" is re-entering active state");
+ }
+ }
+ else
+ {
+ // This job should be marked as finished.
+ IJobDescription jobDesc = jobs.load(jobID,true);
+ resetJobs.add(jobDesc);
+
+ // Label the job "finished"
+ jobs.finishJob(jobID,currentTime);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Job "+jobID+" now completed");
+ }
}
}
return;
@@ -5804,6 +6125,7 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
@@ -5857,6 +6179,7 @@ public class JobManager implements IJobM
.append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Processed'")
.append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Processed'")
.append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Being removed'")
+ .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Being removed'")
.append(" ELSE 'Unknown'")
.append(" END AS state,")
.append("CASE")
@@ -5982,6 +6305,7 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
list.add(jobQueue.actionToString(jobQueue.ACTION_RESCAN));
list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
@@ -6035,6 +6359,7 @@ public class JobManager implements IJobM
.append("CASE")
.append(" WHEN ")
.append(jobQueue.statusField).append("=?")
+ .append(" OR ").append(jobQueue.statusField).append("=?")
.append(" THEN 1 ELSE 0")
.append(" END")
.append(" as deleting,")
@@ -6177,9 +6502,10 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
- sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?,?,?,?,?)");
+ sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?,?,?,?,?,?)");
break;
}
k++;
@@ -6221,7 +6547,8 @@ public class JobManager implements IJobM
break;
case DOCSTATUS_DELETING:
list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
- sb.append(fieldPrefix).append(jobQueue.statusField).append("=?");
+ list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+ sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?)");
break;
case DOCSTATUS_READYFORPROCESSING:
list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Jan 10 09:24:00 2011
@@ -45,6 +45,7 @@ public class JobQueue extends org.apache
public final static int STATUS_BEINGDELETED = 6;
public final static int STATUS_ACTIVENEEDRESCAN = 7;
public final static int STATUS_ACTIVENEEDRESCANPURGATORY = 8;
+ public final static int STATUS_BEINGCLEANED = 9;
// Action values
public final static int ACTION_RESCAN = 0;
@@ -66,9 +67,12 @@ public class JobQueue extends org.apache
// an aborted job. On recovery, PENDING and ACTIVE records are deleted (since they were never
// completed), while PENDINGPURGATORY and ACTIVEPURGATORY records are retained but get marked as PURGATORY.
//
- // BEINGDELETED means that the document is queued to be cleaned up because the owning job is being
+ // BEINGDELETED means that the document is queued because the owning job is being
// deleted. It exists so that jobs that are active can avoid processing a document until the cleanup
// activity is done.
+ //
+ // BEINGCLEANED means that the document is queued because the owning job is in the SHUTTINGDOWN
+ // state, and the document was never encountered during the crawl.
// Field names
public static final String idField = "id";
@@ -98,6 +102,7 @@ public class JobQueue extends org.apache
statusMap.put("D",new Integer(STATUS_BEINGDELETED));
statusMap.put("a",new Integer(STATUS_ACTIVENEEDRESCAN));
statusMap.put("f",new Integer(STATUS_ACTIVENEEDRESCANPURGATORY));
+ statusMap.put("d",new Integer(STATUS_BEINGCLEANED));
}
protected static Map seedstatusMap;
@@ -307,6 +312,12 @@ public class JobQueue extends org.apache
list.add(statusToString(STATUS_BEINGDELETED));
performUpdate(map,"WHERE "+statusField+"=?",list,null);
+ // Map BEINGCLEANED to PURGATORY
+ map.put(statusField,statusToString(STATUS_PURGATORY));
+ list.clear();
+ list.add(statusToString(STATUS_BEINGCLEANED));
+ performUpdate(map,"WHERE "+statusField+"=?",list,null);
+
// Map newseed fields to seed
map.put(isSeedField,seedstatusToString(SEEDSTATUS_SEED));
list.clear();
@@ -376,6 +387,20 @@ public class JobQueue extends org.apache
performUpdate(map,"WHERE "+statusField+"=?",list,null);
}
+ /** Reset doc cleaning worker status.
+ */
+ public void resetDocCleanupWorkerStatus()
+ throws ManifoldCFException
+ {
+ HashMap map = new HashMap();
+ ArrayList list = new ArrayList();
+ // Map BEINGCLEANED to PURGATORY
+ map.put(statusField,statusToString(STATUS_PURGATORY));
+ list.clear();
+ list.add(statusToString(STATUS_BEINGCLEANED));
+ performUpdate(map,"WHERE "+statusField+"=?",list,null);
+ }
+
/** Prepare for a "full scan" job. This will not be called
* unless the job is in the "INACTIVE" state.
* This does the following:
@@ -642,6 +667,34 @@ public class JobQueue extends org.apache
performUpdate(map,"WHERE "+idField+"=?",list,null);
}
+ /** Set the status of a document to "being cleaned".
+ */
+ public void setCleaningStatus(Long id)
+ throws ManifoldCFException
+ {
+ ArrayList list = new ArrayList();
+ list.add(id);
+ HashMap map = new HashMap();
+ map.put(statusField,statusToString(STATUS_BEINGCLEANED));
+ performUpdate(map,"WHERE "+idField+"=?",list,null);
+ noteModifications(0,1,0);
+ }
+
+ /** Set the status of a document to be "no longer cleaning" */
+ public void setUncleaningStatus(Long id)
+ throws ManifoldCFException
+ {
+ HashMap map = new HashMap();
+ map.put(statusField,statusToString(STATUS_PURGATORY));
+ map.put(checkTimeField,null);
+ map.put(checkActionField,null);
+ map.put(failTimeField,null);
+ map.put(failCountField,null);
+ ArrayList list = new ArrayList();
+ list.add(id);
+ performUpdate(map,"WHERE "+idField+"=?",list,null);
+ }
+
/** Remove multiple records entirely.
*@param ids is the set of job queue id's
*/
@@ -715,12 +768,14 @@ public class JobQueue extends org.apache
case STATUS_ACTIVEPURGATORY:
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
+ case STATUS_BEINGCLEANED:
+ // These are all the active states. Being in this state implies that a thread may be working on the document. We
+ // must not interrupt it.
// Initial adds never bring along any carrydown info, so we should be satisfied as long as the record exists.
break;
case STATUS_COMPLETE:
case STATUS_PURGATORY:
- case STATUS_BEINGDELETED:
// Set the status and time both
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
if (desiredExecuteTime == -1L)
@@ -996,6 +1051,7 @@ public class JobQueue extends org.apache
rval = true;
break;
case STATUS_COMPLETE:
+ case STATUS_BEINGCLEANED:
// Requeue the document for processing, if there have been other changes.
if (otherChangesSeen)
{
@@ -1222,6 +1278,8 @@ public class JobQueue extends org.apache
return "a";
case STATUS_ACTIVENEEDRESCANPURGATORY:
return "f";
+ case STATUS_BEINGCLEANED:
+ return "d";
default:
throw new ManifoldCFException("Bad status value: "+Integer.toString(status));
}
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Mon Jan 10 09:24:00 2011
@@ -1115,6 +1115,67 @@ public class Jobs extends org.apache.man
performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
}
+ /** Put job back into active state, from the shutting-down state.
+ *@param jobID is the job identifier.
+ */
+ public void returnJobToActive(Long jobID)
+ throws ManifoldCFException
+ {
+ beginTransaction();
+ try
+ {
+ ArrayList list = new ArrayList();
+ list.add(jobID);
+ IResultSet set = performQuery("SELECT "+statusField+","+connectionNameField+","+outputNameField+" FROM "+getTableName()+" WHERE "+
+ idField+"=? FOR UPDATE",list,null,null);
+ if (set.getRowCount() == 0)
+ throw new ManifoldCFException("Can't find job "+jobID.toString());
+ IResultRow row = set.getRow(0);
+ int status = stringToStatus((String)row.getValue(statusField));
+ int newStatus;
+ switch (status)
+ {
+ case STATUS_SHUTTINGDOWN:
+ if (connectionMgr.checkConnectorExists((String)row.getValue(connectionNameField)))
+ {
+ if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
+ newStatus = STATUS_ACTIVE;
+ else
+ newStatus = STATUS_ACTIVE_NOOUTPUT;
+ }
+ else
+ {
+ if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
+ newStatus = STATUS_ACTIVE_UNINSTALLED;
+ else
+ newStatus = STATUS_ACTIVE_NEITHER;
+ }
+ break;
+ default:
+ // Complain!
+ throw new ManifoldCFException("Unexpected job status encountered: "+Integer.toString(status));
+ }
+
+ HashMap map = new HashMap();
+ map.put(statusField,statusToString(newStatus));
+ performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+ }
+
/** Make job active, and set the start time field.
*@param jobID is the job identifier.
*@param startTime is the current time in milliseconds from start of epoch.
@@ -1763,8 +1824,8 @@ public class Jobs extends org.apache.man
}
- /** Return true if there is a job in either the READYFORDELETE state or the
- * SHUTTINGDOWN state. (This matches the conditions for values to be returned from
+ /** Return true if there is a job in the READYFORDELETE state. (This matches the
+ * conditions for values to be returned from
* getNextDeletableDocuments).
*@return true if such jobs exist.
*/
@@ -1773,9 +1834,24 @@ public class Jobs extends org.apache.man
{
ArrayList list = new ArrayList();
list.add(statusToString(STATUS_READYFORDELETE));
+ IResultSet set = performQuery("SELECT "+idField+" FROM "+getTableName()+" WHERE "+
+ statusField+"=? "+constructOffsetLimitClause(0,1),
+ list,new StringSet(getJobStatusKey()),null,1);
+ return set.getRowCount() > 0;
+ }
+
+ /** Return true if there is a job in the
+ * SHUTTINGDOWN state. (This matches the conditions for values to be returned from
+ * getNextCleanableDocuments).
+ *@return true if such jobs exist.
+ */
+ public boolean cleaningJobsPresent()
+ throws ManifoldCFException
+ {
+ ArrayList list = new ArrayList();
list.add(statusToString(STATUS_SHUTTINGDOWN));
IResultSet set = performQuery("SELECT "+idField+" FROM "+getTableName()+" WHERE "+
- statusField+" IN (?,?) "+constructOffsetLimitClause(0,1),
+ statusField+"=? "+constructOffsetLimitClause(0,1),
list,new StringSet(getJobStatusKey()),null,1);
return set.getRowCount() > 0;
}
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Mon Jan 10 09:24:00 2011
@@ -46,6 +46,8 @@ public class ManifoldCF extends org.apac
protected static ExpireThread[] expireThreads = null;
protected static DocumentDeleteStufferThread deleteStufferThread = null;
protected static DocumentDeleteThread[] deleteThreads = null;
+ protected static DocumentCleanupStufferThread cleanupStufferThread = null;
+ protected static DocumentCleanupThread[] cleanupThreads = null;
protected static JobResetThread jobResetThread = null;
protected static SeedingThread seedingThread = null;
protected static IdleCleanupThread idleCleanupThread = null;
@@ -56,11 +58,15 @@ public class ManifoldCF extends org.apac
protected static WorkerResetManager workerResetManager = null;
/** Delete thread pool reset manager */
protected static DocDeleteResetManager docDeleteResetManager = null;
+ /** Cleanup thread pool reset manager */
+ protected static DocCleanupResetManager docCleanupResetManager = null;
// Number of worker threads
protected static int numWorkerThreads = 0;
// Number of delete threads
protected static int numDeleteThreads = 0;
+ // Number of cleanup threads
+ protected static int numCleanupThreads = 0;
// Number of expiration threads
protected static int numExpireThreads = 0;
// Factor for low water level in queueing
@@ -70,6 +76,7 @@ public class ManifoldCF extends org.apac
protected static final String workerThreadCountProperty = "org.apache.manifoldcf.crawler.threads";
protected static final String deleteThreadCountProperty = "org.apache.manifoldcf.crawler.deletethreads";
+ protected static final String cleanupThreadCountProperty = "org.apache.manifoldcf.crawler.cleanupthreads";
protected static final String expireThreadCountProperty = "org.apache.manifoldcf.crawler.expirethreads";
protected static final String lowWaterFactorProperty = "org.apache.manifoldcf.crawler.lowwaterfactor";
protected static final String stuffAmtFactorProperty = "org.apache.manifoldcf.crawler.stuffamountfactor";
@@ -150,12 +157,18 @@ public class ManifoldCF extends org.apac
String maxDeleteThreads = getProperty(deleteThreadCountProperty);
if (maxDeleteThreads == null)
maxDeleteThreads = "10";
+ String maxCleanupThreads = getProperty(cleanupThreadCountProperty);
+ if (maxCleanupThreads == null)
+ maxCleanupThreads = "10";
String maxExpireThreads = getProperty(expireThreadCountProperty);
if (maxExpireThreads == null)
maxExpireThreads = "10";
numDeleteThreads = new Integer(maxDeleteThreads).intValue();
if (numDeleteThreads < 1 || numDeleteThreads > 300)
throw new ManifoldCFException("Illegal value for the number of delete threads");
+ numCleanupThreads = new Integer(maxCleanupThreads).intValue();
+ if (numCleanupThreads < 1 || numCleanupThreads > 300)
+ throw new ManifoldCFException("Illegal value for the number of cleanup threads");
numExpireThreads = new Integer(maxExpireThreads).intValue();
if (numExpireThreads < 1 || numExpireThreads > 300)
throw new ManifoldCFException("Illegal value for the number of expire threads");
@@ -180,12 +193,14 @@ public class ManifoldCF extends org.apac
DocumentQueue documentQueue = new DocumentQueue();
DocumentDeleteQueue documentDeleteQueue = new DocumentDeleteQueue();
+ DocumentCleanupQueue documentCleanupQueue = new DocumentCleanupQueue();
DocumentDeleteQueue expireQueue = new DocumentDeleteQueue();
BlockingDocuments blockingDocuments = new BlockingDocuments();
workerResetManager = new WorkerResetManager(documentQueue);
docDeleteResetManager = new DocDeleteResetManager(documentDeleteQueue);
+ docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue);
jobStartThread = new JobStartThread();
startupThread = new StartupThread(queueTracker);
@@ -220,6 +235,16 @@ public class ManifoldCF extends org.apac
deleteThreads[i] = new DocumentDeleteThread(Integer.toString(i),documentDeleteQueue,docDeleteResetManager);
i++;
}
+
+ cleanupStufferThread = new DocumentCleanupStufferThread(documentCleanupQueue,numCleanupThreads,docCleanupResetManager);
+ cleanupThreads = new DocumentCleanupThread[numCleanupThreads];
+ i = 0;
+ while (i < numCleanupThreads)
+ {
+ cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,queueTracker,docCleanupResetManager);
+ i++;
+ }
+
jobResetThread = new JobResetThread(queueTracker);
seedingThread = new SeedingThread(queueTracker);
idleCleanupThread = new IdleCleanupThread();
@@ -311,6 +336,14 @@ public class ManifoldCF extends org.apac
i++;
}
+ cleanupStufferThread.start();
+ i = 0;
+ while (i < numCleanupThreads)
+ {
+ cleanupThreads[i].start();
+ i++;
+ }
+
deleteStufferThread.start();
i = 0;
while (i < numDeleteThreads)
@@ -351,6 +384,7 @@ public class ManifoldCF extends org.apac
while (initializationThread != null || jobDeleteThread != null || startupThread != null || jobStartThread != null || stufferThread != null ||
finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null | expireThreads != null ||
deleteStufferThread != null || deleteThreads != null ||
+ cleanupStufferThread != null || cleanupThreads != null ||
jobResetThread != null || seedingThread != null || idleCleanupThread != null || setPriorityThread != null)
{
// Send an interrupt to all threads that are still there.
@@ -412,6 +446,20 @@ public class ManifoldCF extends org.apac
expireThread.interrupt();
}
}
+ if (cleanupStufferThread != null)
+ {
+ cleanupStufferThread.interrupt();
+ }
+ if (cleanupThreads != null)
+ {
+ int i = 0;
+ while (i < cleanupThreads.length)
+ {
+ Thread cleanupThread = cleanupThreads[i++];
+ if (cleanupThread != null)
+ cleanupThread.interrupt();
+ }
+ }
if (deleteStufferThread != null)
{
deleteStufferThread.interrupt();
@@ -534,6 +582,31 @@ public class ManifoldCF extends org.apac
expireThreads = null;
}
+ if (cleanupStufferThread != null)
+ {
+ if (!cleanupStufferThread.isAlive())
+ cleanupStufferThread = null;
+ }
+ if (cleanupThreads != null)
+ {
+ int i = 0;
+ boolean isAlive = false;
+ while (i < cleanupThreads.length)
+ {
+ Thread cleanupThread = cleanupThreads[i];
+ if (cleanupThread != null)
+ {
+ if (!cleanupThread.isAlive())
+ cleanupThreads[i] = null;
+ else
+ isAlive = true;
+ }
+ i++;
+ }
+ if (!isAlive)
+ cleanupThreads = null;
+ }
+
if (deleteStufferThread != null)
{
if (!deleteStufferThread.isAlive())