You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/11/10 14:00:51 UTC
svn commit: r1637833 - in
/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler:
interfaces/ jobs/ system/
Author: kwright
Date: Mon Nov 10 13:00:51 2014
New Revision: 1637833
URL: http://svn.apache.org/r1637833
Log:
Take out priorityset field, and get everything to build again.
Modified:
manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Nov 10 13:00:51 2014
@@ -217,21 +217,18 @@ public interface IJobManager
/** Get a list of not-yet-processed documents to reprioritize. Documents in all jobs will be
* returned by this method. Up to n document descriptions will be returned.
- *@param currentTime is the current time stamp for this prioritization pass. Avoid
- * picking up any documents that are labeled with this timestamp or after.
*@param n is the maximum number of document descriptions desired.
*@return the document descriptions.
*/
- public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(long currentTime, int n)
+ public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(int n)
throws ManifoldCFException;
/** Save a set of document priorities. In the case where a document was eligible to have its
* priority set, but it no longer is eligible, then the provided priority will not be written.
- *@param currentTime is the time in milliseconds since epoch.
*@param descriptions are the document descriptions.
*@param priorities are the desired priorities.
*/
- public void writeDocumentPriorities(long currentTime, DocumentDescription[] descriptions, IPriorityCalculator[] priorities)
+ public void writeDocumentPriorities(DocumentDescription[] descriptions, IPriorityCalculator[] priorities)
throws ManifoldCFException;
// This method supports the "expiration" thread
@@ -414,7 +411,7 @@ public interface IJobManager
*@param documentDescriptions is the set of description objects for the documents that have had their parent carrydown information changed.
*@param docPriorities are the document priorities to assign to the documents, if needed.
*/
- public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, IPriorityCalculator[] docPriorities)
+ public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, IPriorityCalculator[] docPriorities)
throws ManifoldCFException;
/** Requeue a document because of carrydown changes.
@@ -423,7 +420,7 @@ public interface IJobManager
*@param documentDescription is the description object for the document that has had its parent carrydown information changed.
*@param docPriority is the document priority to assign to the document, if needed.
*/
- public void carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, IPriorityCalculator docPriority)
+ public void carrydownChangeDocument(DocumentDescription documentDescription, IPriorityCalculator docPriority)
throws ManifoldCFException;
/** Requeue a document for further processing in the future.
@@ -558,14 +555,13 @@ public interface IJobManager
*@param docIDs are the local document identifiers.
*@param overrideSchedule is true if any existing document schedule should be overridden.
*@param hopcountMethod is either accurate, nodelete, or neverdelete.
- *@param currentTime is the current time in milliseconds since epoch.
*@param documentPriorities are the document priorities corresponding to the document identifiers.
*@param prereqEventNames are the events that must be completed before each document can be processed.
*/
public void addDocumentsInitial(String processID,
Long jobID, String[] legalLinkTypes,
String[] docIDHashes, String[] docIDs, boolean overrideSchedule,
- int hopcountMethod, long currentTime, IPriorityCalculator[] documentPriorities,
+ int hopcountMethod, IPriorityCalculator[] documentPriorities,
String[][] prereqEventNames)
throws ManifoldCFException;
@@ -650,7 +646,6 @@ public interface IJobManager
*@param dataNames are the names of the data to carry down to the child from this parent.
*@param dataValues are the values to carry down to the child from this parent, corresponding to dataNames above. If CharacterInput objects are passed in here,
* it is the caller's responsibility to clean these up.
- *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
*@param priority is the desired document priority for the document.
*@param prereqEventNames are the events that must be completed before the document can be processed.
*/
@@ -660,7 +655,7 @@ public interface IJobManager
String parentIdentifierHash,
String relationshipType,
int hopcountMethod, String[] dataNames, Object[][] dataValues,
- long currentTime, IPriorityCalculator priority, String[] prereqEventNames)
+ IPriorityCalculator priority, String[] prereqEventNames)
throws ManifoldCFException;
/** Add documents to the queue in bulk.
@@ -680,7 +675,6 @@ public interface IJobManager
*@param dataNames are the names of the data to carry down to the child from this parent.
*@param dataValues are the values to carry down to the child from this parent, corresponding to dataNames above. If CharacterInput objects are passed in here,
* it is the caller's responsibility to clean these up.
- *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
*@param priorities are the desired document priorities for the documents.
*@param prereqEventNames are the events that must be completed before each document can be processed.
*/
@@ -690,7 +684,7 @@ public interface IJobManager
String parentIdentifierHash,
String relationshipType,
int hopcountMethod, String[][] dataNames, Object[][][] dataValues,
- long currentTime, IPriorityCalculator[] priorities,
+ IPriorityCalculator[] priorities,
String[][] prereqEventNames)
throws ManifoldCFException;
Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Nov 10 13:00:51 2014
@@ -2016,13 +2016,11 @@ public class JobManager implements IJobM
/** Get a list of not-yet-processed documents to reprioritize. Documents in all jobs will be
* returned by this method. Up to n document descriptions will be returned.
- *@param currentTime is the current time stamp for this prioritization pass. Avoid
- * picking up any documents that are labeled with this timestamp or after.
*@param n is the maximum number of document descriptions desired.
*@return the document descriptions.
*/
@Override
- public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(long currentTime, int n)
+ public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(int n)
throws ManifoldCFException
{
StringBuilder sb = new StringBuilder("SELECT ");
@@ -2034,41 +2032,10 @@ public class JobManager implements IJobM
.append(jobQueue.docHashField).append(",")
.append(jobQueue.docIDField).append(",")
.append(jobQueue.jobIDField)
- .append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ")
- .append(database.buildConjunctionClause(list,new ClauseDescription[]{
- new MultiClause(jobQueue.statusField,new Object[]{
- JobQueue.statusToString(jobQueue.STATUS_HOPCOUNTREMOVED),
- JobQueue.statusToString(jobQueue.STATUS_PENDING),
- JobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)}),
- new UnitaryClause(jobQueue.prioritySetField,"<",new Long(currentTime))}));
-
- sb.append(" AND ")
- .append(jobQueue.checkActionField).append("=?");
- list.add(jobQueue.actionToString(JobQueue.ACTION_RESCAN));
-
- // Per CONNECTORS-290, we need to be leaving priorities blank for jobs that aren't using them,
- // so this will be changed to not include jobs where the priorities have been bashed to null.
- //
- // I've included ALL states that might have non-null doc priorities. This includes states
- // corresponding to uninstalled connectors, since there is no transition that cleans out the
- // document priorities in these states. The time during which a connector is uninstalled is
- // expected to be short, because typically this state is the result of an installation procedure
- // rather than willful action on the part of a user.
-
- sb.append(" AND EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ")
+ .append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ")
.append(database.buildConjunctionClause(list,new ClauseDescription[]{
- new MultiClause("t1."+jobs.statusField,new Object[]{
- Jobs.statusToString(Jobs.STATUS_STARTINGUP),
- Jobs.statusToString(Jobs.STATUS_STARTINGUPMINIMAL),
- Jobs.statusToString(Jobs.STATUS_ACTIVE),
- Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING),
- Jobs.statusToString(Jobs.STATUS_ACTIVE_UNINSTALLED),
- Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED)
- }),
- new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)}))
- .append(") ");
-
- sb.append(database.constructOffsetLimitClause(0,n));
+ new UnitaryClause(jobQueue.needPriorityField,jobQueue.needPriorityToString(true))})).append(" ")
+ .append(database.constructOffsetLimitClause(0,n));
// Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
//jobQueue.unconditionallyAnalyzeTables();
@@ -2094,12 +2061,11 @@ public class JobManager implements IJobM
/** Save a set of document priorities. In the case where a document was eligible to have its
* priority set, but it no longer is eligible, then the provided priority will not be written.
- *@param currentTime is the time in milliseconds since epoch.
*@param documentDescriptions are the document descriptions.
*@param priorities are the desired priorities.
*/
@Override
- public void writeDocumentPriorities(long currentTime, DocumentDescription[] documentDescriptions, IPriorityCalculator[] priorities)
+ public void writeDocumentPriorities(DocumentDescription[] documentDescriptions, IPriorityCalculator[] priorities)
throws ManifoldCFException
{
@@ -2107,16 +2073,14 @@ public class JobManager implements IJobM
while (true)
{
// This should be ordered by document identifier hash in order to prevent potential deadlock conditions
- HashMap indexMap = new HashMap();
+ Map<String,Integer> indexMap = new HashMap<String,Integer>();
String[] docIDHashes = new String[documentDescriptions.length];
- int i = 0;
- while (i < documentDescriptions.length)
+ for (int i = 0; i < documentDescriptions.length; i++)
{
String documentIDHash = documentDescriptions[i].getDocumentIdentifierHash() + ":"+documentDescriptions[i].getJobID();
docIDHashes[i] = documentIDHash;
indexMap.put(documentIDHash,new Integer(i));
- i++;
}
java.util.Arrays.sort(docIDHashes);
@@ -2129,18 +2093,16 @@ public class JobManager implements IJobM
{
// Need to order the writes by doc id.
- i = 0;
- while (i < docIDHashes.length)
+ for (int i = 0; i < docIDHashes.length; i++)
{
String docIDHash = docIDHashes[i];
- Integer x = (Integer)indexMap.remove(docIDHash);
+ Integer x = indexMap.remove(docIDHash);
if (x == null)
throw new ManifoldCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
int index = x.intValue();
DocumentDescription dd = documentDescriptions[index];
IPriorityCalculator priority = priorities[index];
- jobQueue.writeDocPriority(currentTime,dd.getID(),priority);
- i++;
+ jobQueue.writeDocPriority(dd.getID(),priority);
}
database.performCommit();
break;
@@ -2470,10 +2432,7 @@ public class JobManager implements IJobM
// But, after I did this, it was no longer necessary to have such a large transaction either.
- // Anything older than 10 minutes ago is considered eligible for reprioritization.
- long prioritizationTime = currentTime - 60000L * 10L;
-
- ThrottleLimit vList = new ThrottleLimit(n,prioritizationTime);
+ ThrottleLimit vList = new ThrottleLimit(n);
IResultSet jobconnections = jobs.getActiveJobConnections();
HashMap connectionSet = new HashMap();
@@ -2755,8 +2714,7 @@ public class JobManager implements IJobM
.append(jobQueue.docIDField).append(",t0.")
.append(jobQueue.statusField).append(",t0.")
.append(jobQueue.failTimeField).append(",t0.")
- .append(jobQueue.failCountField).append(",t0.")
- .append(jobQueue.prioritySetField).append(" FROM ").append(jobQueue.getTableName())
+ .append(jobQueue.failCountField).append(" FROM ").append(jobQueue.getTableName())
.append(" t0 ").append(jobQueue.getGetNextDocumentsIndexHint()).append(" WHERE ");
sb.append(database.buildConjunctionClause(list,new ClauseDescription[]{
@@ -4296,14 +4254,13 @@ public class JobManager implements IJobM
*@param docIDs are the local document identifiers.
*@param overrideSchedule is true if any existing document schedule should be overridden.
*@param hopcountMethod is either accurate, nodelete, or neverdelete.
- *@param currentTime is the current time in milliseconds since epoch.
*@param documentPriorities are the document priorities corresponding to the document identifiers.
*@param prereqEventNames are the events that must be completed before each document can be processed.
*/
@Override
public void addDocumentsInitial(String processID, Long jobID, String[] legalLinkTypes,
String[] docIDHashes, String[] docIDs, boolean overrideSchedule,
- int hopcountMethod, long currentTime, IPriorityCalculator[] documentPriorities,
+ int hopcountMethod, IPriorityCalculator[] documentPriorities,
String[][] prereqEventNames)
throws ManifoldCFException
{
@@ -4390,12 +4347,12 @@ public class JobManager implements IJobM
int status = jobQueue.stringToStatus((String)row.getValue(jobQueue.statusField));
Long checkTimeValue = (Long)row.getValue(jobQueue.checkTimeField);
- jobQueue.updateExistingRecordInitial(rowID,status,checkTimeValue,executeTime,currentTime,docPriority,docPrereqs,processID);
+ jobQueue.updateExistingRecordInitial(rowID,status,checkTimeValue,executeTime,docPriority,docPrereqs,processID);
}
else
{
// Not found. Attempt an insert instead. This may fail due to constraints, but if this happens, the whole transaction will be retried.
- jobQueue.insertNewRecordInitial(jobID,docIDHash,docID,docPriority,executeTime,currentTime,docPrereqs,processID);
+ jobQueue.insertNewRecordInitial(jobID,docIDHash,docID,docPriority,executeTime,docPrereqs,processID);
}
z++;
@@ -4821,7 +4778,7 @@ public class JobManager implements IJobM
String[] docIDHashes, String[] docIDs,
String parentIdentifierHash, String relationshipType,
int hopcountMethod, String[][] dataNames, Object[][][] dataValues,
- long currentTime, IPriorityCalculator[] documentPriorities,
+ IPriorityCalculator[] documentPriorities,
String[][] prereqEventNames)
throws ManifoldCFException
{
@@ -4996,7 +4953,7 @@ public class JobManager implements IJobM
else
{
// Not found. Attempt an insert instead. This may fail due to constraints, but if this happens, the whole transaction will be retried.
- jobQueue.insertNewRecord(jobID,docIDHash,reorderedDocumentIdentifiers[z],reorderedDocumentPriorities[z],0L,currentTime,reorderedDocumentPrerequisites[z]);
+ jobQueue.insertNewRecord(jobID,docIDHash,reorderedDocumentIdentifiers[z],reorderedDocumentPriorities[z],0L,reorderedDocumentPrerequisites[z]);
}
}
@@ -5022,7 +4979,7 @@ public class JobManager implements IJobM
// helps us determine whether we're going to need to "flip" HOPCOUNTREMOVED documents
// to the PENDING state. If the new link ended in an existing record, THEN we need to flip them all!
jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
- 0L,currentTime,carrydownChangesSeen[z] || (hopcountChangesSeen!=null && hopcountChangesSeen[z]),
+ 0L,carrydownChangesSeen[z] || (hopcountChangesSeen!=null && hopcountChangesSeen[z]),
reorderedDocumentPriorities[z],reorderedDocumentPrerequisites[z]);
// Signal if we need to perform the flip
if (hopcountChangesSeen != null && hopcountChangesSeen[z])
@@ -5093,7 +5050,6 @@ public class JobManager implements IJobM
*@param hopcountMethod is the desired method for managing hopcounts.
*@param dataNames are the names of the data to carry down to the child from this parent.
*@param dataValues are the values to carry down to the child from this parent, corresponding to dataNames above.
- *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
*@param priority is the desired document priority for the document.
*@param prereqEventNames are the events that must be completed before the document can be processed.
*/
@@ -5102,13 +5058,13 @@ public class JobManager implements IJobM
Long jobID, String[] legalLinkTypes, String docIDHash, String docID,
String parentIdentifierHash, String relationshipType,
int hopcountMethod, String[] dataNames, Object[][] dataValues,
- long currentTime, IPriorityCalculator priority, String[] prereqEventNames)
+ IPriorityCalculator priority, String[] prereqEventNames)
throws ManifoldCFException
{
addDocuments(processID,jobID,legalLinkTypes,
new String[]{docIDHash},new String[]{docID},
parentIdentifierHash,relationshipType,hopcountMethod,new String[][]{dataNames},
- new Object[][][]{dataValues},currentTime,new IPriorityCalculator[]{priority},new String[][]{prereqEventNames});
+ new Object[][][]{dataValues},new IPriorityCalculator[]{priority},new String[][]{prereqEventNames});
}
/** Undo the addition of child documents to the queue, for a set of documents.
@@ -5460,7 +5416,7 @@ public class JobManager implements IJobM
*@param docPriorities are the document priorities to assign to the documents, if needed.
*/
@Override
- public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, IPriorityCalculator[] docPriorities)
+ public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, IPriorityCalculator[] docPriorities)
throws ManifoldCFException
{
if (documentDescriptions.length == 0)
@@ -5540,7 +5496,7 @@ public class JobManager implements IJobM
if (jr != null)
// It was an existing row; do the update logic; use the 'carrydown changes' flag = true all the time.
jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
- 0L,currentTime,true,docPriorities[originalIndex],null);
+ 0L,true,docPriorities[originalIndex],null);
j++;
}
database.performCommit();
@@ -5578,10 +5534,10 @@ public class JobManager implements IJobM
*@param docPriority is the document priority to assign to the document, if needed.
*/
@Override
- public void carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, IPriorityCalculator docPriority)
+ public void carrydownChangeDocument(DocumentDescription documentDescription, IPriorityCalculator docPriority)
throws ManifoldCFException
{
- carrydownChangeDocumentMultiple(new DocumentDescription[]{documentDescription},currentTime,new IPriorityCalculator[]{docPriority});
+ carrydownChangeDocumentMultiple(new DocumentDescription[]{documentDescription},new IPriorityCalculator[]{docPriority});
}
/** Sleep a random amount of time after a transaction abort.
@@ -8151,6 +8107,7 @@ public class JobManager implements IJobM
// All the job's documents need to have their docpriority set to null, to clear dead wood out of the docpriority index.
// See CONNECTORS-290.
// We do this BEFORE updating the job state.
+
jobQueue.clearDocPriorities(jobID);
IJobDescription jobDesc = jobs.load(jobID,true);
@@ -9448,30 +9405,30 @@ public class JobManager implements IJobM
{
// For each connection, there is (a) a number (which is the maximum per bin), and (b)
// a current running count per bin. These are stored as elements in a hash map.
- protected HashMap connectionMap = new HashMap();
+ protected Map<String,ThrottleJobItem> connectionMap = new HashMap<String,ThrottleJobItem>();
// The maximum number of jobs that have reached their chunk size limit that we
// need
- protected int n;
+ protected final int n;
// This is the hash table that maps a job ID to the object that tracks the number
// of documents already accumulated for this resultset. The count of the number
// of queue records we have is tallied by going through each job in this table
// and adding the records outstanding for it.
- protected HashMap jobQueueHash = new HashMap();
+ protected final Map<Long,QueueHashItem> jobQueueHash = new HashMap<Long,QueueHashItem>();
// This is the map from jobid to connection name
- protected HashMap jobConnection = new HashMap();
+ protected Map<Long,String> jobConnection = new HashMap<Long,String>();
// This is the set of allowed connection names. We discard all documents that are
// not from that set.
- protected HashMap activeConnections = new HashMap();
+ protected Map<String,IRepositoryConnector> activeConnections = new HashMap<String,IRepositoryConnector>();
// This is the number of documents per set per connection.
- protected HashMap setSizes = new HashMap();
+ protected Map<String,Integer> setSizes = new HashMap<String,Integer>();
// These are the individual connection maximums, keyed by connection name.
- protected HashMap maxConnectionCounts = new HashMap();
+ protected Map<String,MutableInteger> maxConnectionCounts = new HashMap<String,MutableInteger>();
// This is the maximum number of documents per set over all the connections we are looking at. This helps us establish a sanity limit.
protected int maxSetSize = 0;
@@ -9480,19 +9437,15 @@ public class JobManager implements IJobM
protected int documentsProcessed = 0;
// This is where we accumulate blocking documents. This is an arraylist of DocumentDescription objects.
- protected ArrayList blockingDocumentArray = new ArrayList();
-
- // Cutoff time for documents eligible for prioritization
- protected long prioritizationTime;
+ protected final List<DocumentDescription> blockingDocumentArray = new ArrayList<DocumentDescription>();
/** Constructor.
* This class is built up piecemeal, so the constructor does nothing.
*@param n is the maximum number of full job descriptions we want at this time.
*/
- public ThrottleLimit(int n, long prioritizationTime)
+ public ThrottleLimit(int n)
{
this.n = n;
- this.prioritizationTime = prioritizationTime;
Logging.perf.debug("Limit instance created");
}
@@ -9541,7 +9494,7 @@ public class JobManager implements IJobM
if (Logging.perf.isDebugEnabled())
Logging.perf.debug(" Adding fetch limit of "+Integer.toString(upperLimit)+" fetches for expression '"+regexp+"' for connection '"+connectionName+"'");
- ThrottleJobItem ji = (ThrottleJobItem)connectionMap.get(connectionName);
+ ThrottleJobItem ji = connectionMap.get(connectionName);
if (ji == null)
{
ji = new ThrottleJobItem();
@@ -9582,7 +9535,7 @@ public class JobManager implements IJobM
/** Make a deep copy */
public ThrottleLimit makeDeepCopy()
{
- ThrottleLimit rval = new ThrottleLimit(n,prioritizationTime);
+ ThrottleLimit rval = new ThrottleLimit(n);
// Create a true copy of all the structures in which counts are kept. The referential structures (e.g. connection hashes)
// do not need a deep copy.
rval.activeConnections = activeConnections;
@@ -9592,18 +9545,13 @@ public class JobManager implements IJobM
rval.jobConnection = jobConnection;
// The structures where counts are maintained DO need a deep copy.
rval.documentsProcessed = documentsProcessed;
- Iterator iter;
- iter = connectionMap.keySet().iterator();
- while (iter.hasNext())
+ for (String key : connectionMap.keySet())
{
- Object key = iter.next();
- rval.connectionMap.put(key,((ThrottleJobItem)connectionMap.get(key)).duplicate());
+ rval.connectionMap.put(key,connectionMap.get(key).duplicate());
}
- iter = jobQueueHash.keySet().iterator();
- while (iter.hasNext())
+ for (Long key : jobQueueHash.keySet())
{
- Object key = iter.next();
- rval.jobQueueHash.put(key,((QueueHashItem)jobQueueHash.get(key)).duplicate());
+ rval.jobQueueHash.put(key,jobQueueHash.get(key).duplicate());
}
return rval;
}
@@ -9651,13 +9599,13 @@ public class JobManager implements IJobM
Long jobIDValue = (Long)row.getValue(JobQueue.jobIDField);
// Get the connection name for this row
- String connectionName = (String)jobConnection.get(jobIDValue);
+ String connectionName = jobConnection.get(jobIDValue);
if (connectionName == null)
{
Logging.perf.debug(" Row does not have an eligible job - excluding");
return false;
}
- IRepositoryConnector connectorInstance = (IRepositoryConnector)activeConnections.get(connectionName);
+ IRepositoryConnector connectorInstance = activeConnections.get(connectionName);
if (connectorInstance == null)
{
Logging.perf.debug(" Row does not have an eligible connector instance - excluding");
@@ -9665,7 +9613,7 @@ public class JobManager implements IJobM
}
// Find the connection limit for this document
- MutableInteger connectionLimit = (MutableInteger)maxConnectionCounts.get(connectionName);
+ MutableInteger connectionLimit = maxConnectionCounts.get(connectionName);
if (connectionLimit != null)
{
if (connectionLimit.intValue() == 0)
@@ -9677,11 +9625,11 @@ public class JobManager implements IJobM
}
// Tally this item in the job queue hash, so we can detect when to stop
- QueueHashItem queueItem = (QueueHashItem)jobQueueHash.get(jobIDValue);
+ QueueHashItem queueItem = jobQueueHash.get(jobIDValue);
if (queueItem == null)
{
// Need to talk to the connector to get a max number of docs per chunk
- int maxCount = ((Integer)setSizes.get(connectionName)).intValue();
+ int maxCount = setSizes.get(connectionName).intValue();
queueItem = new QueueHashItem(maxCount);
jobQueueHash.put(jobIDValue,queueItem);
@@ -9697,7 +9645,7 @@ public class JobManager implements IJobM
documentsProcessed++;
//scanRecord.addBins(binNames);
- ThrottleJobItem item = (ThrottleJobItem)connectionMap.get(connectionName);
+ ThrottleJobItem item = connectionMap.get(connectionName);
// If there is no schedule-based throttling on this connection, we're done.
if (item == null)
@@ -9716,8 +9664,11 @@ public class JobManager implements IJobM
if (Logging.perf.isDebugEnabled())
Logging.perf.debug(" Bin "+binNames[j]+" has no more available fetches - excluding");
- Object o = row.getValue(JobQueue.prioritySetField);
- if (o == null || ((Long)o).longValue() <= prioritizationTime)
+ //???
+ //Object o = row.getValue(JobQueue.prioritySetField);
+ //if (o == null || ((Long)o).longValue() <= prioritizationTime)
+ // Fully enabling blocking document logic, for now.
+ if (true)
{
// Need to add a document descriptor based on this row to the blockingDocuments object!
// This will cause it to be reprioritized preferentially, getting it out of the way if it shouldn't
@@ -9752,12 +9703,10 @@ public class JobManager implements IJobM
return false;
// If the number of chunks exceeds n, we are done
- Iterator iter = jobQueueHash.keySet().iterator();
int count = 0;
- while (iter.hasNext())
+ for (Long jobID : jobQueueHash.keySet())
{
- Long jobID = (Long)iter.next();
- QueueHashItem item = (QueueHashItem)jobQueueHash.get(jobID);
+ QueueHashItem item = jobQueueHash.get(jobID);
count += item.getChunkCount();
if (count > n)
return false;
@@ -9774,7 +9723,7 @@ public class JobManager implements IJobM
protected static class QueueHashItem
{
// The number of items per chunk for this job
- int itemsPerChunk;
+ final int itemsPerChunk;
// The number of chunks so far, INCLUDING incomplete chunks
int chunkCount = 0;
// The number of documents in the current incomplete chunk
@@ -9826,10 +9775,10 @@ public class JobManager implements IJobM
protected static class ThrottleJobItem
{
/** These are the bin limits. This is an array of ThrottleLimitSpec objects. */
- protected ArrayList throttleLimits = new ArrayList();
+ protected List<ThrottleLimitSpec> throttleLimits = new ArrayList<ThrottleLimitSpec>();
/** This is a map of the bins and their current counts. If an entry doesn't exist, it's considered to be
* the same as maxBinCount. */
- protected HashMap binCounts = new HashMap();
+ protected final Map<String,MutableInteger> binCounts = new HashMap<String,MutableInteger>();
/** Constructor. */
public ThrottleJobItem()
@@ -9859,11 +9808,9 @@ public class JobManager implements IJobM
{
ThrottleJobItem rval = new ThrottleJobItem();
rval.throttleLimits = throttleLimits;
- Iterator iter = binCounts.keySet().iterator();
- while (iter.hasNext())
+ for (String key : binCounts.keySet())
{
- String key = (String)iter.next();
- this.binCounts.put(key,((MutableInteger)binCounts.get(key)).duplicate());
+ this.binCounts.put(key,binCounts.get(key).duplicate());
}
return rval;
}
@@ -9874,7 +9821,7 @@ public class JobManager implements IJobM
*/
public boolean isEmpty(String binName)
{
- MutableInteger value = (MutableInteger)binCounts.get(binName);
+ MutableInteger value = binCounts.get(binName);
int remaining;
if (value == null)
{
@@ -9893,7 +9840,7 @@ public class JobManager implements IJobM
*/
public void decrement(String binName)
{
- MutableInteger value = (MutableInteger)binCounts.get(binName);
+ MutableInteger value = binCounts.get(binName);
if (value == null)
{
int x = findMaxCount(binName);
@@ -9938,7 +9885,7 @@ public class JobManager implements IJobM
int i = 0;
while (i < throttleLimits.size())
{
- ThrottleLimitSpec spec = (ThrottleLimitSpec)throttleLimits.get(i++);
+ ThrottleLimitSpec spec = throttleLimits.get(i++);
Pattern p = spec.getRegexp();
Matcher m = p.matcher(binName);
if (m.find())
@@ -9957,9 +9904,9 @@ public class JobManager implements IJobM
protected static class ThrottleLimitSpec
{
/** Regexp */
- protected Pattern regexp;
+ protected final Pattern regexp;
/** The fetch limit for all bins matching that regexp */
- protected int maxCount;
+ protected final int maxCount;
/** Constructor */
public ThrottleLimitSpec(String regexp, int maxCount)
Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Nov 10 13:00:51 2014
@@ -41,10 +41,10 @@ import java.util.*;
* <tr><td>status</td><td>CHAR(1)</td><td></td></tr>
* <tr><td>isseed</td><td>CHAR(1)</td><td></td></tr>
* <tr><td>docpriority</td><td>FLOAT</td><td></td></tr>
- * <tr><td>priorityset</td><td>BIGINT</td><td></td></tr>
* <tr><td>checkaction</td><td>CHAR(1)</td><td></td></tr>
* <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
* <tr><td>seedingprocessid</td><td>VARCHAR(16)</td><td></td></tr>
+ * <tr><td>needpriority</td><td>CHAR(1)</td><td></td></tr>
* </table>
* <br><br>
*
@@ -114,10 +114,10 @@ public class JobQueue extends org.apache
public static final String failCountField = "failcount";
public static final String isSeedField = "isseed";
public static final String docPriorityField = "docpriority";
- public static final String prioritySetField = "priorityset";
public static final String checkActionField = "checkaction";
public static final String processIDField = "processid";
public static final String seedingProcessIDField = "seedingprocessid";
+ public static final String needPriorityField = "needpriority";
public static final double noDocPriorityValue = 1e9;
public static final Double nullDocPriority = new Double(noDocPriorityValue + 1.0);
@@ -205,10 +205,10 @@ public class JobQueue extends org.apache
map.put(statusField,new ColumnDescription("CHAR(1)",false,false,null,null,false));
map.put(isSeedField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
map.put(docPriorityField,new ColumnDescription("FLOAT",false,true,null,null,false));
- map.put(prioritySetField,new ColumnDescription("BIGINT",false,false,null,null,false));
map.put(checkActionField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
map.put(seedingProcessIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
+ map.put(needPriorityField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
performCreate(map,null);
}
else
@@ -225,7 +225,7 @@ public class JobQueue extends org.apache
IndexDescription jobSeedIndex = new IndexDescription(false,new String[]{isSeedField,jobIDField});
IndexDescription failTimeIndex = new IndexDescription(false,new String[]{failTimeField,jobIDField});
IndexDescription actionTimeStatusIndex = new IndexDescription(false,new String[]{statusField,checkActionField,checkTimeField});
- IndexDescription prioritysetStatusIndex = new IndexDescription(false,new String[]{statusField,prioritySetField});
+ IndexDescription needPriorityIndex = new IndexDescription(false,new String[]{needPriorityField});
// No evidence that the extra fields help at all, for any database...
IndexDescription docpriorityIndex = new IndexDescription(false,new String[]{docPriorityField,statusField,checkActionField,checkTimeField});
@@ -239,6 +239,8 @@ public class JobQueue extends org.apache
if (uniqueIndex != null && id.equals(uniqueIndex))
uniqueIndex = null;
+ else if (needPriorityIndex != null && id.equals(needPriorityIndex))
+ needPriorityIndex = null;
else if (jobStatusIndex != null && id.equals(jobStatusIndex))
jobStatusIndex = null;
else if (jobSeedIndex != null && id.equals(jobSeedIndex))
@@ -247,8 +249,6 @@ public class JobQueue extends org.apache
failTimeIndex = null;
else if (actionTimeStatusIndex != null && id.equals(actionTimeStatusIndex))
actionTimeStatusIndex = null;
- else if (prioritysetStatusIndex != null && id.equals(prioritysetStatusIndex))
- prioritysetStatusIndex = null;
else if (docpriorityIndex != null && id.equals(docpriorityIndex))
docpriorityIndex = null;
else if (indexName.indexOf("_pkey") == -1)
@@ -261,6 +261,9 @@ public class JobQueue extends org.apache
if (jobStatusIndex != null)
performAddIndex(null,jobStatusIndex);
+ if (needPriorityIndex != null)
+ performAddIndex(null,needPriorityIndex);
+
if (jobSeedIndex != null)
performAddIndex(null,jobSeedIndex);
@@ -270,9 +273,6 @@ public class JobQueue extends org.apache
if (actionTimeStatusIndex != null)
performAddIndex(null,actionTimeStatusIndex);
- if (prioritysetStatusIndex != null)
- performAddIndex(null,prioritysetStatusIndex);
-
if (docpriorityIndex != null)
performAddIndex(null,docpriorityIndex);
@@ -733,7 +733,7 @@ public class JobQueue extends org.apache
// Map COMPLETE to PENDINGPURGATORY
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
- map.put(prioritySetField,new Long(0L));
+ map.put(needPriorityField,needPriorityToString(true));
// Do not reset priorities here! They should all be blank at this point.
map.put(checkTimeField,new Long(0L));
map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -791,7 +791,7 @@ public class JobQueue extends org.apache
// Map COMPLETE to PENDINGPURGATORY.
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
- map.put(prioritySetField,new Long(0L));
+ map.put(needPriorityField,needPriorityToString(true));
// Do not reset priorities here! They should all be blank at this point.
map.put(checkTimeField,new Long(0L));
map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -866,11 +866,11 @@ public class JobQueue extends org.apache
}
/** Write out a document priority */
- public void writeDocPriority(long currentTime, Long rowID, IPriorityCalculator priority)
+ public void writeDocPriority(Long rowID, IPriorityCalculator priority)
throws ManifoldCFException
{
HashMap map = new HashMap();
- map.put(prioritySetField,new Long(currentTime));
+ map.put(needPriorityField,needPriorityToString(false));
map.put(docPriorityField,new Double(priority.getDocumentPriority()));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -879,12 +879,12 @@ public class JobQueue extends org.apache
noteModifications(0,1,0);
}
- /** Clear all document priorities for a job */
+ /** Clear all document priorities for a job that is going to sleep */
public void clearDocPriorities(Long jobID)
throws ManifoldCFException
{
HashMap map = new HashMap();
- map.put(prioritySetField,new Long(0L));
+ map.put(needPriorityField,needPriorityToString(false));
map.put(docPriorityField,nullDocPriority);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -913,7 +913,7 @@ public class JobQueue extends org.apache
checkTimeValue = null;
// Remove document priority; we don't want to pollute the queue. See CONNECTORS-290.
map.put(docPriorityField,nullDocPriority);
- map.put(prioritySetField,new Long(0L));
+ map.put(needPriorityField,needPriorityToString(false));
break;
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
@@ -1186,7 +1186,7 @@ public class JobQueue extends org.apache
* The record is presumed to exist and have been locked, via "FOR UPDATE".
*/
public void updateExistingRecordInitial(Long recordID, int currentStatus, Long checkTimeValue,
- long desiredExecuteTime, long currentTime, IPriorityCalculator desiredPriority, String[] prereqEvents,
+ long desiredExecuteTime, IPriorityCalculator desiredPriority, String[] prereqEvents,
String processID)
throws ManifoldCFException
{
@@ -1225,7 +1225,7 @@ public class JobQueue extends org.apache
map.put(failCountField,null);
// Update the doc priority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(prioritySetField,new Long(currentTime));
+ map.put(needPriorityField,needPriorityToString(false));
break;
case STATUS_PENDING:
@@ -1282,7 +1282,7 @@ public class JobQueue extends org.apache
*@param docID is the local document identifier.
*/
public void insertNewRecordInitial(Long jobID, String docHash, String docID, IPriorityCalculator desiredDocPriority,
- long desiredExecuteTime, long currentTime, String[] prereqEvents, String processID)
+ long desiredExecuteTime, String[] prereqEvents, String processID)
throws ManifoldCFException
{
// No prerequisites should be possible at this point.
@@ -1302,7 +1302,7 @@ public class JobQueue extends org.apache
map.put(seedingProcessIDField,processID);
// Set the document priority
map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
- map.put(prioritySetField,new Long(currentTime));
+ map.put(needPriorityField,needPriorityToString(false));
performInsert(map,null);
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(1,0,0);
@@ -1483,7 +1483,7 @@ public class JobQueue extends org.apache
* The record is presumed to exist and have been locked, via "FOR UPDATE".
*/
public void updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
- long desiredExecuteTime, long currentTime, boolean otherChangesSeen,
+ long desiredExecuteTime, boolean otherChangesSeen,
IPriorityCalculator desiredPriority, String[] prereqEvents)
throws ManifoldCFException
{
@@ -1501,7 +1501,7 @@ public class JobQueue extends org.apache
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(prioritySetField,new Long(currentTime));
+ map.put(needPriorityField,needPriorityToString(false));
break;
case STATUS_COMPLETE:
case STATUS_BEINGCLEANED:
@@ -1518,7 +1518,7 @@ public class JobQueue extends org.apache
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(prioritySetField,new Long(currentTime));
+ map.put(needPriorityField,needPriorityToString(false));
break;
}
return;
@@ -1547,7 +1547,7 @@ public class JobQueue extends org.apache
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(prioritySetField,new Long(currentTime));
+ map.put(needPriorityField,needPriorityToString(false));
break;
}
return;
@@ -1571,7 +1571,7 @@ public class JobQueue extends org.apache
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(prioritySetField,new Long(currentTime));
+ map.put(needPriorityField,needPriorityToString(false));
break;
}
return;
@@ -1613,7 +1613,7 @@ public class JobQueue extends org.apache
*
*/
public void insertNewRecord(Long jobID, String docIDHash, String docID, IPriorityCalculator desiredDocPriority, long desiredExecuteTime,
- long currentTime, String[] prereqEvents)
+ String[] prereqEvents)
throws ManifoldCFException
{
HashMap map = new HashMap();
@@ -1627,7 +1627,7 @@ public class JobQueue extends org.apache
map.put(statusField,statusToString(STATUS_PENDING));
// Be sure to set the priority also
map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
- map.put(prioritySetField,new Long(currentTime));
+ map.put(needPriorityField,needPriorityToString(false));
performInsert(map,null);
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(1,0,0);
@@ -1694,6 +1694,26 @@ public class JobQueue extends org.apache
}
}
+ /** Convert need priority value to boolean.
+ */
+ public static boolean stringToNeedPriority(String value)
+ throws ManifoldCFException
+ {
+ if (value != null && value.equals("T"))
+ return true;
+ return false;
+ }
+
+ /** Convert boolean to need priority value.
+ */
+ public static String needPriorityToString(boolean value)
+ throws ManifoldCFException
+ {
+ if (value)
+ return "T";
+ return "F";
+ }
+
/** Convert status field value to integer.
*@param value is the string.
*@return the integer.
Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java Mon Nov 10 13:00:51 2014
@@ -188,6 +188,7 @@ public class CrawlerAgent implements IAg
// activity.
// In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
// priorities.
+ // ??? -- start the process of reprioritization ONLY; don't do the whole thing.
while (true)
{
long startTime = System.currentTimeMillis();
@@ -200,12 +201,12 @@ public class CrawlerAgent implements IAg
}
long updateTime = currentTimeValue.longValue();
- DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000);
+ DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(10000);
if (docs.length == 0)
break;
// Calculate new priorities for all these documents
- ManifoldCF.writeDocumentPriorities(threadContext,docs,connectionMap,jobDescriptionMap,updateTime);
+ ManifoldCF.writeDocumentPriorities(threadContext,docs,connectionMap,jobDescriptionMap);
Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
}
Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Mon Nov 10 13:00:51 2014
@@ -946,7 +946,7 @@ public class ManifoldCF extends org.apac
}
// Now, requeue the documents with the new priorities
- jobManager.carrydownChangeDocumentMultiple(requeueCandidates,currentTime,docPriorities);
+ jobManager.carrydownChangeDocumentMultiple(requeueCandidates,docPriorities);
}
/** Stuff colons so we can't have conflicts. */
@@ -1014,6 +1014,7 @@ public class ManifoldCF extends org.apac
// activity.
// In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
// priorities.
+ // ???? Should only start the process of reprioritization, not complete it.
while (true)
{
long startTime = System.currentTimeMillis();
@@ -1026,12 +1027,12 @@ public class ManifoldCF extends org.apac
}
long updateTime = currentTimeValue.longValue();
- DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000);
+ DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(10000);
if (docs.length == 0)
break;
// Calculate new priorities for all these documents
- writeDocumentPriorities(threadContext,docs,connectionMap,jobDescriptionMap,updateTime);
+ writeDocumentPriorities(threadContext,docs,connectionMap,jobDescriptionMap);
Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
}
@@ -1042,8 +1043,7 @@ public class ManifoldCF extends org.apac
/** Write a set of document priorities, based on the current queue tracker.
*/
public static void writeDocumentPriorities(IThreadContext threadContext, DocumentDescription[] descs,
- Map<String,IRepositoryConnection> connectionMap, Map<Long,IJobDescription> jobDescriptionMap,
- long currentTime)
+ Map<String,IRepositoryConnection> connectionMap, Map<Long,IJobDescription> jobDescriptionMap)
throws ManifoldCFException
{
IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
@@ -1128,7 +1128,7 @@ public class ManifoldCF extends org.apac
rt.preloadBinValues();
// Now, write all the priorities we can.
- jobManager.writeDocumentPriorities(currentTime,descs,priorities);
+ jobManager.writeDocumentPriorities(descs,priorities);
rt.clearPreloadedValues();
}
Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java Mon Nov 10 13:00:51 2014
@@ -215,22 +215,24 @@ public class SeedingActivity implements
throws ManifoldCFException
{
// First, prioritize the documents using the queue tracker
- long prioritizationTime = System.currentTimeMillis();
IPriorityCalculator[] docPriorities = new IPriorityCalculator[docIDHashes.length];
- int i = 0;
- while (i < docIDHashes.length)
+ rt.clearPreloadRequests();
+
+ for (int i = 0 ; i < docIDHashes.length ; i++)
{
// Calculate desired document priority based on current queuetracker status.
String[] bins = connector.getBinNames(docIDs[i]);
- docPriorities[i] = new PriorityCalculator(rt,connection,bins);
-
- i++;
+ PriorityCalculator p = new PriorityCalculator(rt,connection,bins);
+ docPriorities[i] = p;
+ p.makePreloadRequest();
}
+ rt.preloadBinValues();
+
jobManager.addDocumentsInitial(processID,
jobID,legalLinkTypes,docIDHashes,docIDs,overrideSchedule,hopcountMethod,
- prioritizationTime,docPriorities,prereqEventNames);
+ docPriorities,prereqEventNames);
}
Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java Mon Nov 10 13:00:51 2014
@@ -121,29 +121,29 @@ public class SetPriorityThread extends T
}
// Cycle through the current list of stuffer-identified documents until we come to the end. Reprioritize these
- // first.
+ // first. NOTE: These documents will already have document priorities.
DocumentDescription desc = blockingDocuments.getBlockingDocument();
if (desc != null)
{
ManifoldCF.writeDocumentPriorities(threadContext,
- new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,currentTime);
+ new DocumentDescription[]{desc},connectionMap,jobDescriptionMap);
processedCount++;
continue;
}
- /* no longer useful given current architecture; only need to reprioritize blocking documents
+
// Grab a list of document identifiers to set priority on.
// We may well wind up calculating priority for documents that wind up having their
// state changed before we can write back, but this is okay because update is only
// going to be permitted for rows that still have the right state.
- // I found that a limit of 1000 causes postgresql to basically do a linear scan, while a limit of 20 does not!
- DocumentDescription[] descs = jobManager.getNextReprioritizationDocuments(currentTime,20);
+ DocumentDescription[] descs = jobManager.getNextNotYetProcessedReprioritizationDocuments(1000);
if (descs.length > 0)
{
- writePriorities(threadContext,mgr,jobManager,descs,connectionMap,jobDescriptionMap,currentTime);
+ ManifoldCF.writeDocumentPriorities(threadContext,
+ descs,connectionMap,jobDescriptionMap);
processedCount += descs.length;
continue;
}
- */
+
Logging.threads.debug("Done reprioritizing because no more documents to reprioritize");
ManifoldCF.sleep(30000L);
break;
Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Mon Nov 10 13:00:51 2014
@@ -1844,7 +1844,7 @@ public class WorkerThread extends Thread
jobManager.addDocuments(processID,
jobID,legalLinkTypes,docidHashes,docids,db.getParentIdentifierHash(),db.getLinkType(),hopcountMode,
- dataNames,dataValues,currentTime,priorities,eventNames);
+ dataNames,dataValues,priorities,eventNames);
rt.clearPreloadedValues();
}