You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/11 15:33:24 UTC
svn commit: r1057662 - in
/incubator/lcf/branches/release-0.1-incubating-branch: ./
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/
framework/pull-agent...
Author: kwright
Date: Tue Jan 11 14:33:23 2011
New Revision: 1057662
URL: http://svn.apache.org/viewvc?rev=1057662&view=rev
Log:
Pull up fix for CONNECTORS-149 to release branch.
Modified:
incubator/lcf/branches/release-0.1-incubating-branch/ (props changed)
incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java
Propchange: incubator/lcf/branches/release-0.1-incubating-branch/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 11 14:33:23 2011
@@ -1,2 +1,2 @@
/incubator/lcf/branches/release-0.1-branch:1056045
-/incubator/lcf/trunk:1039159,1041674,1041679,1041763,1041885,1041968,1042383,1042836-1042837,1042896,1042898,1043728,1044276,1044287,1044294,1044641,1049834,1050183,1050605,1056045,1056054,1056104,1056116,1056131-1056139,1056157,1056170-1056173,1056195,1056245,1056382-1056385,1056474,1056738,1057076,1057120,1057137,1057573
+/incubator/lcf/trunk:1039159,1041674,1041679,1041763,1041885,1041968,1042383,1042836-1042837,1042896,1042898,1043728,1044276,1044287,1044294,1044641,1049834,1050183,1050605,1056045,1056054,1056104,1056116,1056131-1056139,1056157,1056170-1056173,1056195,1056245,1056382-1056385,1056474,1056738,1057076,1057120,1057137,1057573,1057659
Modified: incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt?rev=1057662&r1=1057661&r2=1057662&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt Tue Jan 11 14:33:23 2011
@@ -4,6 +4,12 @@ $Id$
======================= Release 0.1 =======================
Release Date: See http://incubator.apache.org/connectors for the official release date.
+CONNECTORS-149: Expire threads did not obey rules as far as deleting documents
+belonging to other jobs. Also, the test for whether a document was shared between
+jobs did not take the output connection into account. Finally, I found an infinite
+loop in the job delete stuffer thread code.
+(Karl Wright)
+
CONNECTORS-147: Disable PDF's everywhere except for user documentation.
(Grant Ingersoll, David Crossley, Karl Wright)
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1057662&r1=1057661&r2=1057662&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Tue Jan 11 14:33:23 2011
@@ -210,7 +210,7 @@ public interface IJobManager
*@param currentTime is the current time.
*@return the array of document descriptions to expire.
*/
- public DocumentDescription[] getExpiredDocuments(int n, long currentTime)
+ public DocumentSetAndFlags getExpiredDocuments(int n, long currentTime)
throws ManifoldCFException;
// This method supports the "queue stuffer" thread
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1057662&r1=1057661&r2=1057662&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Tue Jan 11 14:33:23 2011
@@ -852,7 +852,7 @@ public class JobManager implements IJobM
if (Logging.perf.isDebugEnabled())
Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
- // We need to organize the returned set by connection name, so that we can efficiently
+ // We need to organize the returned set by connection name and output connection name, so that we can efficiently
// use getUnindexableDocumentIdentifiers.
// This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
// objects.
@@ -880,15 +880,23 @@ public class JobManager implements IJobM
failCount = (int)failCountValue.longValue();
IJobDescription jobDesc = load(jobID);
String connectionName = jobDesc.getConnectionName();
+ String outputConnectionName = jobDesc.getOutputConnectionName();
DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
jobID,documentIDHash,documentID,failTime,failCount);
- documentIDMap.put(documentIDHash,dd);
- ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+ String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
+ documentIDMap.put(compositeDocumentID,dd);
+ Map y = (Map)connectionNameMap.get(connectionName);
+ if (y == null)
+ {
+ y = new HashMap();
+ connectionNameMap.put(connectionName,y);
+ }
+ ArrayList x = (ArrayList)y.get(outputConnectionName);
if (x == null)
{
// New entry needed
x = new ArrayList();
- connectionNameMap.put(connectionName,x);
+ y.put(outputConnectionName,x);
}
x.add(dd);
i++;
@@ -902,35 +910,52 @@ public class JobManager implements IJobM
while (iter.hasNext())
{
String connectionName = (String)iter.next();
- ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
- // Do the filter query
- DocumentDescription[] descriptions = new DocumentDescription[x.size()];
- int j = 0;
- while (j < descriptions.length)
- {
- descriptions[j] = (DocumentDescription)x.get(j);
- j++;
- }
- String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName);
- j = 0;
- while (j < docIDHashes.length)
- {
- String docIDHash = docIDHashes[j++];
- allowedDocIds.put(docIDHash,docIDHash);
+ Map y = (Map)connectionNameMap.get(connectionName);
+ Iterator outputIter = y.keySet().iterator();
+ while (outputIter.hasNext())
+ {
+ String outputConnectionName = (String)outputIter.next();
+ ArrayList x = (ArrayList)y.get(outputConnectionName);
+ // Do the filter query
+ DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+ int j = 0;
+ while (j < descriptions.length)
+ {
+ descriptions[j] = (DocumentDescription)x.get(j);
+ j++;
+ }
+ String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
+ j = 0;
+ while (j < docIDHashes.length)
+ {
+ String docIDHash = docIDHashes[j++];
+ String key = makeCompositeID(docIDHash,connectionName);
+ allowedDocIds.put(key,docIDHash);
+ }
}
}
// Now, assemble a result, and change the state of the records accordingly
- DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
- boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+ // First thing to do is order by document hash, so we reduce the risk of deadlock.
+ String[] compositeIDArray = new String[documentIDMap.size()];
i = 0;
iter = documentIDMap.keySet().iterator();
while (iter.hasNext())
{
- String docIDHash = (String)iter.next();
- DocumentDescription dd = (DocumentDescription)documentIDMap.get(docIDHash);
+ compositeIDArray[i++] = (String)iter.next();
+ }
+
+ java.util.Arrays.sort(compositeIDArray);
+
+ DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+ boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+ i = 0;
+ while (i < compositeIDArray.length)
+ {
+ String compositeDocID = compositeIDArray[i];
+ DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID);
// Determine whether we can delete it from the index or not
- rvalBoolean[i] = (allowedDocIds.get(docIDHash) != null);
+ rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null);
// Set the record status to "being cleaned" and return it
rval[i++] = dd;
jobQueue.setCleaningStatus(dd.getID());
@@ -967,6 +992,14 @@ public class JobManager implements IJobM
}
}
+ /** Create a composite document hash key. This consists of the document id hash plus the
+ * connection name.
+ */
+ protected static String makeCompositeID(String docIDHash, String connectionName)
+ {
+ return docIDHash + ":" + connectionName;
+ }
+
/** Get list of deletable document descriptions. This list will take into account
* multiple jobs that may own the same document. All documents for which a description
* is returned will be transitioned to the "beingdeleted" state. Documents which are
@@ -1076,15 +1109,23 @@ public class JobManager implements IJobM
failCount = (int)failCountValue.longValue();
IJobDescription jobDesc = load(jobID);
String connectionName = jobDesc.getConnectionName();
+ String outputConnectionName = jobDesc.getOutputConnectionName();
DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
jobID,documentIDHash,documentID,failTime,failCount);
- documentIDMap.put(documentIDHash,dd);
- ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+ String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
+ documentIDMap.put(compositeDocumentID,dd);
+ Map y = (Map)connectionNameMap.get(connectionName);
+ if (y == null)
+ {
+ y = new HashMap();
+ connectionNameMap.put(connectionName,y);
+ }
+ ArrayList x = (ArrayList)y.get(outputConnectionName);
if (x == null)
{
// New entry needed
x = new ArrayList();
- connectionNameMap.put(connectionName,x);
+ y.put(outputConnectionName,x);
}
x.add(dd);
i++;
@@ -1098,33 +1139,51 @@ public class JobManager implements IJobM
while (iter.hasNext())
{
String connectionName = (String)iter.next();
- ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
- // Do the filter query
- DocumentDescription[] descriptions = new DocumentDescription[x.size()];
- int j = 0;
- while (j < descriptions.length)
- {
- descriptions[j] = (DocumentDescription)x.get(j);
- j++;
- }
- String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName);
- j = 0;
- while (j < docIDHashes.length)
- {
- String docIDHash = docIDHashes[j++];
- allowedDocIds.put(docIDHash,docIDHash);
+ Map y = (Map)connectionNameMap.get(connectionName);
+ Iterator outputIter = y.keySet().iterator();
+ while (outputIter.hasNext())
+ {
+ String outputConnectionName = (String)outputIter.next();
+ ArrayList x = (ArrayList)y.get(outputConnectionName);
+ // Do the filter query
+ DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+ int j = 0;
+ while (j < descriptions.length)
+ {
+ descriptions[j] = (DocumentDescription)x.get(j);
+ j++;
+ }
+ String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
+ j = 0;
+ while (j < docIDHashes.length)
+ {
+ String docIDHash = docIDHashes[j++];
+ String key = makeCompositeID(docIDHash,connectionName);
+ allowedDocIds.put(key,docIDHash);
+ }
}
}
// Now, assemble a result, and change the state of the records accordingly
- DocumentDescription[] rval = new DocumentDescription[allowedDocIds.size()];
+ // First thing to do is order by document hash to reduce chances of deadlock.
+ String[] compositeIDArray = new String[documentIDMap.size()];
i = 0;
iter = documentIDMap.keySet().iterator();
while (iter.hasNext())
{
- String docIDHash = (String)iter.next();
- DocumentDescription dd = (DocumentDescription)documentIDMap.get(docIDHash);
- if (allowedDocIds.get(docIDHash) == null)
+ compositeIDArray[i++] = (String)iter.next();
+ }
+
+ java.util.Arrays.sort(compositeIDArray);
+
+ DocumentDescription[] rval = new DocumentDescription[allowedDocIds.size()];
+ int j = 0;
+ i = 0;
+ while (i < compositeIDArray.length)
+ {
+ String compositeDocumentID = compositeIDArray[i];
+ DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocumentID);
+ if (allowedDocIds.get(compositeDocumentID) == null)
{
// Delete this record and do NOT return it.
jobQueue.deleteRecord(dd.getID());
@@ -1140,9 +1199,10 @@ public class JobManager implements IJobM
else
{
// Set the record status to "being deleted" and return it
- rval[i++] = dd;
+ rval[j++] = dd;
jobQueue.setDeletingStatus(dd.getID());
}
+ i++;
}
if (Logging.perf.isDebugEnabled())
@@ -1177,13 +1237,14 @@ public class JobManager implements IJobM
}
/** Get a list of document identifiers that should actually be deleted from the index, from a list that
- * might contain identifiers that are shared with other jobs. The input list is guaranteed to be
- * smaller in size than maxInClauseCount for the database.
+ * might contain identifiers that are shared with other jobs, which are targeted to the same output connection.
+ * The input list is guaranteed to be smaller in size than maxInClauseCount for the database.
*@param documentIdentifiers is the set of document identifiers to consider.
*@param connectionName is the connection name for ALL the document identifiers.
+ *@param outputConnectionName is the output connection name for ALL the document identifiers.
*@return the set of documents which should be removed from the index.
*/
- protected String[] getUnindexableDocumentIdentifiers(DocumentDescription[] documentIdentifiers, String connectionName)
+ protected String[] getUnindexableDocumentIdentifiers(DocumentDescription[] documentIdentifiers, String connectionName, String outputConnectionName)
throws ManifoldCFException
{
// This is where we will count the individual document id's
@@ -1233,8 +1294,9 @@ public class JobManager implements IJobM
// will never be removed from the index.
//
// Instead, the only solution is to not queue a document for any activity that is inconsistent with activities
- // that may already be ongoing for that document. For this reason, I have introduced a "BEING_DELETED" state
- // for a document. This state will allow the various queries that queue up activities to avoid documents that
+ // that may already be ongoing for that document. For this reason, I have introduced a "BEING_DELETED"
+ // and "BEING_CLEANED" state
+ // for a document. These states will allow the various queries that queue up activities to avoid documents that
// are currently being processed elsewhere.
list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
@@ -1242,11 +1304,13 @@ public class JobManager implements IJobM
list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
list.add(connectionName);
+ list.add(outputConnectionName);
sb.append(") AND t0.").append(jobQueue.statusField).append(" IN (?,?,?) AND EXISTS(SELECT 'x' FROM ")
.append(jobs.getTableName()).append(" t1 WHERE t0.")
.append(jobQueue.jobIDField).append("=t1.").append(jobs.idField).append(" AND t1.")
- .append(jobs.connectionNameField).append("=?)");
+ .append(jobs.connectionNameField).append("=? AND t1.")
+ .append(jobs.outputNameField).append("=?)");
// Do the query, and then count the number of times each document identifier occurs.
IResultSet results = database.performQuery(sb.toString(),list,null,null);
@@ -1489,14 +1553,14 @@ public class JobManager implements IJobM
*@param currentTime is the current time.
*@return the array of document descriptions to expire.
*/
- public DocumentDescription[] getExpiredDocuments(int n, long currentTime)
+ public DocumentSetAndFlags getExpiredDocuments(int n, long currentTime)
throws ManifoldCFException
{
// Screening query
// Moved outside of transaction, so there's less chance of keeping jobstatus cache key tied up
// for an extended period of time.
if (!jobs.activeJobsPresent())
- return new DocumentDescription[0];
+ return new DocumentSetAndFlags(new DocumentDescription[0], new boolean[0]);
long startTime = 0L;
if (Logging.perf.isDebugEnabled())
@@ -1572,21 +1636,21 @@ public class JobManager implements IJobM
// To avoid deadlock, we want to update the document id hashes in order. This means reading into a structure I can sort by docid hash,
// before updating any rows in jobqueue.
- String[] docIDHashes = new String[set.getRowCount()];
- Map storageMap = new HashMap();
+ HashMap connectionNameMap = new HashMap();
+ HashMap documentIDMap = new HashMap();
Map statusMap = new HashMap();
int i = 0;
while (i < set.getRowCount())
{
IResultRow row = set.getRow(i);
- Long id = (Long)row.getValue(jobQueue.idField);
Long jobID = (Long)row.getValue(jobQueue.jobIDField);
- String docIDHash = (String)row.getValue(jobQueue.docHashField);
- String docID = (String)row.getValue(jobQueue.docIDField);
+ String documentIDHash = (String)row.getValue(jobQueue.docHashField);
+ String documentID = (String)row.getValue(jobQueue.docIDField);
int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString());
Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+ // Failtime is probably not useful in this context, but we'll bring it along for completeness
long failTime;
if (failTimeValue == null)
failTime = -1L;
@@ -1594,35 +1658,95 @@ public class JobManager implements IJobM
failTime = failTimeValue.longValue();
int failCount;
if (failCountValue == null)
- failCount = -1;
+ failCount = 0;
else
failCount = (int)failCountValue.longValue();
-
- DocumentDescription dd = new DocumentDescription(id,jobID,docIDHash,docID,failTime,failCount);
- docIDHashes[i] = docIDHash + ":" +jobID;
- storageMap.put(docIDHashes[i],dd);
- statusMap.put(docIDHashes[i],new Integer(status));
+ IJobDescription jobDesc = load(jobID);
+ String connectionName = jobDesc.getConnectionName();
+ String outputConnectionName = jobDesc.getOutputConnectionName();
+ DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+ jobID,documentIDHash,documentID,failTime,failCount);
+ String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
+ documentIDMap.put(compositeDocumentID,dd);
+ statusMap.put(compositeDocumentID,new Integer(status));
+ Map y = (Map)connectionNameMap.get(connectionName);
+ if (y == null)
+ {
+ y = new HashMap();
+ connectionNameMap.put(connectionName,y);
+ }
+ ArrayList x = (ArrayList)y.get(outputConnectionName);
+ if (x == null)
+ {
+ // New entry needed
+ x = new ArrayList();
+ y.put(outputConnectionName,x);
+ }
+ x.add(dd);
i++;
}
- // No duplicates are possible here
- java.util.Arrays.sort(docIDHashes);
+ // For each bin, obtain a filtered answer, and enter all answers into a hash table.
+ // We'll then scan the result again to look up the right descriptions for return,
+ // and delete the ones that are owned multiply.
+ HashMap allowedDocIds = new HashMap();
+ Iterator iter = connectionNameMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String connectionName = (String)iter.next();
+ Map y = (Map)connectionNameMap.get(connectionName);
+ Iterator outputIter = y.keySet().iterator();
+ while (outputIter.hasNext())
+ {
+ String outputConnectionName = (String)outputIter.next();
+ ArrayList x = (ArrayList)y.get(outputConnectionName);
+ // Do the filter query
+ DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+ int j = 0;
+ while (j < descriptions.length)
+ {
+ descriptions[j] = (DocumentDescription)x.get(j);
+ j++;
+ }
+ String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
+ j = 0;
+ while (j < docIDHashes.length)
+ {
+ String docIDHash = docIDHashes[j++];
+ String key = makeCompositeID(docIDHash,connectionName);
+ allowedDocIds.put(key,docIDHash);
+ }
+ }
+ }
+ // Now, assemble a result, and change the state of the records accordingly
+ // First thing to do is order by document hash, so we reduce the risk of deadlock.
+ String[] compositeIDArray = new String[documentIDMap.size()];
i = 0;
- while (i < docIDHashes.length)
+ iter = documentIDMap.keySet().iterator();
+ while (iter.hasNext())
{
- String docIDHash = docIDHashes[i];
- DocumentDescription dd = (DocumentDescription)storageMap.get(docIDHash);
- Long id = dd.getID();
- int status = ((Integer)statusMap.get(docIDHash)).intValue();
-
- // Set status to "ACTIVE".
- jobQueue.updateActiveRecord(id,status);
+ compositeIDArray[i++] = (String)iter.next();
+ }
+
+ java.util.Arrays.sort(compositeIDArray);
+
+ DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+ boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+ i = 0;
+ while (i < compositeIDArray.length)
+ {
+ String compositeDocID = compositeIDArray[i];
+ DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID);
+ // Determine whether we can delete it from the index or not
+ rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null);
+ // Set the record status to "being cleaned" and return it
+ rval[i++] = dd;
+ jobQueue.updateActiveRecord(dd.getID(),((Integer)statusMap.get(compositeDocID)).intValue());
+ }
- answers.add(dd);
+ return new DocumentSetAndFlags(rval, rvalBoolean);
- i++;
- }
}
catch (ManifoldCFException e)
{
@@ -1647,14 +1771,6 @@ public class JobManager implements IJobM
sleepFor(sleepAmt);
}
- DocumentDescription[] rval = new DocumentDescription[answers.size()];
- int k = 0;
- while (k < rval.length)
- {
- rval[k] = (DocumentDescription)answers.get(k);
- k++;
- }
- return rval;
}
}
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java?rev=1057662&r1=1057661&r2=1057662&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java Tue Jan 11 14:33:23 2011
@@ -34,7 +34,7 @@ public class ExpireStufferThread extends
// Local data
// This is a reference to the static main document expiration queue
- protected DocumentDeleteQueue documentQueue;
+ protected DocumentCleanupQueue documentQueue;
/** Worker thread pool reset manager */
protected WorkerResetManager resetManager;
// This is the number of entries we want to stuff at any one time.
@@ -45,7 +45,7 @@ public class ExpireStufferThread extends
*@param n represents the number of threads that will be processing queued stuff, NOT the
* number of documents to be done at once!
*/
- public ExpireStufferThread(DocumentDeleteQueue documentQueue, int n, WorkerResetManager resetManager)
+ public ExpireStufferThread(DocumentCleanupQueue documentQueue, int n, WorkerResetManager resetManager)
throws ManifoldCFException
{
super();
@@ -115,8 +115,10 @@ public class ExpireStufferThread extends
// The number n passed in here thus cannot be used in a query to limit the number of returned
// results. Instead, it must be factored into the limit portion of the query.
long currentTime = System.currentTimeMillis();
- DocumentDescription[] descs = jobManager.getExpiredDocuments(deleteChunkSize,currentTime);
-
+ DocumentSetAndFlags docsAndFlags = jobManager.getExpiredDocuments(deleteChunkSize,currentTime);
+ DocumentDescription[] descs = docsAndFlags.getDocumentSet();
+ boolean[] deleteFromIndex = docsAndFlags.getFlags();
+
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
@@ -134,14 +136,14 @@ public class ExpireStufferThread extends
}
// Do the stuffing
- DeleteQueuedDocument[] docDescs = new DeleteQueuedDocument[descs.length];
+ CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[descs.length];
int k = 0;
while (k < docDescs.length)
{
- docDescs[k] = new DeleteQueuedDocument(descs[k]);
+ docDescs[k] = new CleanupQueuedDocument(descs[k],deleteFromIndex[k]);
k++;
}
- DocumentDeleteSet set = new DocumentDeleteSet(docDescs);
+ DocumentCleanupSet set = new DocumentCleanupSet(docDescs);
documentQueue.addDocuments(set);
// If we don't wait here, the other threads don't seem to have a chance to queue anything else up.
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1057662&r1=1057661&r2=1057662&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Tue Jan 11 14:33:23 2011
@@ -35,7 +35,7 @@ public class ExpireThread extends Thread
// Local data
protected String id;
// This is a reference to the static main document queue
- protected DocumentDeleteQueue documentQueue;
+ protected DocumentCleanupQueue documentQueue;
/** Worker thread pool reset manager */
protected WorkerResetManager resetManager;
/** Queue tracker */
@@ -44,7 +44,7 @@ public class ExpireThread extends Thread
/** Constructor.
*@param id is the expire thread id.
*/
- public ExpireThread(String id, DocumentDeleteQueue documentQueue, QueueTracker queueTracker, WorkerResetManager resetManager)
+ public ExpireThread(String id, DocumentCleanupQueue documentQueue, QueueTracker queueTracker, WorkerResetManager resetManager)
throws ManifoldCFException
{
super();
@@ -86,7 +86,7 @@ public class ExpireThread extends Thread
// we update its status, even if there is an exception!!!
// See if there is anything on the queue for me
- DocumentDeleteSet dds = documentQueue.getDocuments();
+ DocumentCleanupSet dds = documentQueue.getDocuments();
if (dds == null)
// It's a reset, so recycle
continue;
@@ -104,7 +104,7 @@ public class ExpireThread extends Thread
int j = 0;
while (j < dds.getCount())
{
- DeleteQueuedDocument dqd = dds.getDocument(j++);
+ CleanupQueuedDocument dqd = dds.getDocument(j++);
DocumentDescription ddd = dqd.getDocumentDescription();
Long jobID = ddd.getJobID();
IJobDescription job = jobManager.load(jobID,true);
@@ -138,7 +138,7 @@ public class ExpireThread extends Thread
j = 0;
while (j < list.size())
{
- DeleteQueuedDocument dqd = (DeleteQueuedDocument)list.get(j);
+ CleanupQueuedDocument dqd = (CleanupQueuedDocument)list.get(j);
DocumentDescription ddd = dqd.getDocumentDescription();
Long jobID = ddd.getJobID();
IJobDescription job = jobManager.load(jobID,true);
@@ -188,16 +188,32 @@ public class ExpireThread extends Thread
{
String outputConnectionName = (String)outputIterator.next();
ArrayList indexList = (ArrayList)outputMap.get(outputConnectionName);
- String[] docClassesToRemove = new String[indexList.size()];
- String[] hashedDocsToRemove = new String[indexList.size()];
+ // Count the number of docs to actually delete. This will be a subset of the documents in the list.
+ int k = 0;
+ int removeCount = 0;
+ while (k < indexList.size())
+ {
+ int index = ((Integer)indexList.get(k++)).intValue();
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
+ removeCount++;
+ }
+
+ // Allocate removal arrays
+ String[] docClassesToRemove = new String[removeCount];
+ String[] hashedDocsToRemove = new String[removeCount];
// Now, iterate over the index list
- int k = 0;
+ k = 0;
+ removeCount = 0;
while (k < indexList.size())
{
int index = ((Integer)indexList.get(k)).intValue();
- docClassesToRemove[k] = (String)arrayDocClasses.get(index);
- hashedDocsToRemove[k] = (String)arrayDocHashes.get(index);
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
+ {
+ docClassesToRemove[removeCount] = (String)arrayDocClasses.get(index);
+ hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(index);
+ removeCount++;
+ }
k++;
}
@@ -235,7 +251,7 @@ public class ExpireThread extends Thread
{
int index = ((Integer)indexList.get(k)).intValue();
- DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(index);
+ CleanupQueuedDocument dqd = (CleanupQueuedDocument)arrayDocsToDelete.get(index);
DocumentDescription ddd = dqd.getDocumentDescription();
Long jobID = ddd.getJobID();
int hopcountMethod = ((Integer)hopcountMethods.get(index)).intValue();
@@ -287,7 +303,7 @@ public class ExpireThread extends Thread
int j = 0;
while (j < dds.getCount())
{
- DeleteQueuedDocument dqd = dds.getDocument(j);
+ CleanupQueuedDocument dqd = dds.getDocument(j);
if (dqd.wasProcessed() == false)
{
DocumentDescription ddd = dqd.getDocumentDescription();
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1057662&r1=1057661&r2=1057662&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Tue Jan 11 14:33:23 2011
@@ -194,11 +194,11 @@ public class ManifoldCF extends org.apac
DocumentQueue documentQueue = new DocumentQueue();
DocumentDeleteQueue documentDeleteQueue = new DocumentDeleteQueue();
DocumentCleanupQueue documentCleanupQueue = new DocumentCleanupQueue();
- DocumentDeleteQueue expireQueue = new DocumentDeleteQueue();
+ DocumentCleanupQueue expireQueue = new DocumentCleanupQueue();
BlockingDocuments blockingDocuments = new BlockingDocuments();
- workerResetManager = new WorkerResetManager(documentQueue);
+ workerResetManager = new WorkerResetManager(documentQueue,expireQueue);
docDeleteResetManager = new DocDeleteResetManager(documentDeleteQueue);
docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue);
Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java?rev=1057662&r1=1057661&r2=1057662&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java Tue Jan 11 14:33:23 2011
@@ -32,12 +32,15 @@ public class WorkerResetManager extends
/** The document queue */
protected DocumentQueue dq;
+ /** The expiration queue */
+ protected DocumentCleanupQueue eq;
/** Constructor. */
- public WorkerResetManager(DocumentQueue dq)
+ public WorkerResetManager(DocumentQueue dq, DocumentCleanupQueue eq)
{
super();
this.dq = dq;
+ this.eq = eq;
}
/** Reset */
@@ -47,6 +50,7 @@ public class WorkerResetManager extends
IJobManager jobManager = JobManagerFactory.make(tc);
jobManager.resetDocumentWorkerStatus();
dq.clear();
+ eq.clear();
}
}