You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/02 12:27:36 UTC
svn commit: r1546965 [2/2] - in /manifoldcf/trunk: ./ framework/
framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/
framework/agents/src/main/java/org/apache/manifoldcf/agents/system/
framework/pull-agent/src/main/java/org/apache/m...
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java Mon Dec 2 11:27:35 2013
@@ -31,7 +31,6 @@ public class CrawlerAgent implements IAg
// Thread objects.
// These get filled in as threads are created.
- protected InitializationThread initializationThread = null;
protected JobStartThread jobStartThread = null;
protected StufferThread stufferThread = null;
protected FinisherThread finisherThread = null;
@@ -140,13 +139,20 @@ public class CrawlerAgent implements IAg
* Call this method to clean up dangling persistent state when a cluster is just starting
* to come up. This method CANNOT be called when there are any active agents
* processes at all.
+ *@param processID is the current process ID.
*/
@Override
- public void cleanUpAgentData(IThreadContext threadContext)
+ public void cleanUpAllAgentData(IThreadContext threadContext, String currentProcessID)
throws ManifoldCFException
{
IJobManager jobManager = JobManagerFactory.make(threadContext);
jobManager.cleanupProcessData();
+ // What kind of reprioritization should be done here?
+ // Answer: since we basically keep everything in the database now, the only kind of reprioritization we need
+ // to take care of are dangling ones that won't get done because the process that was doing them went
+ // away. BUT: somebody may have blown away lock info, in which case we won't know anything at all.
+ // So we do everything in that case.
+ ManifoldCF.resetAllDocumentPriorities(threadContext,System.currentTimeMillis(),currentProcessID);
}
/** Cleanup after agents process.
@@ -154,14 +160,58 @@ public class CrawlerAgent implements IAg
* This method CANNOT be called when the agent is active, but it can
* be called at any time and by any process in order to guarantee that a terminated
* agent does not block other agents from completing their tasks.
- *@param processID is the process ID of the agent to clean up after.
+ *@param currentProcessID is the current process ID.
+ *@param cleanupProcessID is the process ID of the agent to clean up after.
*/
@Override
- public void cleanUpAgentData(IThreadContext threadContext, String processID)
+ public void cleanUpAgentData(IThreadContext threadContext, String currentProcessID, String cleanupProcessID)
throws ManifoldCFException
{
IJobManager jobManager = JobManagerFactory.make(threadContext);
- jobManager.cleanupProcessData(processID);
+ jobManager.cleanupProcessData(cleanupProcessID);
+ ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
+ String reproID = rt.isSpecifiedProcessReprioritizing(cleanupProcessID);
+ if (reproID != null)
+ {
+ // We have to take over the prioritization for the process, which apparently died
+ // in the middle.
+ IRepositoryConnectionManager connectionManager = RepositoryConnectionManagerFactory.make(threadContext);
+
+ // Reprioritize all documents in the jobqueue, 1000 at a time
+
+ Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
+ Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+
+ // Do the 'not yet processed' documents only. Documents that are queued for reprocessing will be assigned
+ // new priorities. Already processed documents won't. This guarantees that our bins are appropriate for current thread
+ // activity.
+ // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
+ // priorities.
+ while (true)
+ {
+ long startTime = System.currentTimeMillis();
+
+ Long currentTimeValue = rt.checkReprioritizationInProgress();
+ if (currentTimeValue == null)
+ {
+ // Some other process or thread superceded us.
+ return;
+ }
+ long updateTime = currentTimeValue.longValue();
+
+ DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000);
+ if (docs.length == 0)
+ break;
+
+ // Calculate new priorities for all these documents
+ ManifoldCF.writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,
+ rt,updateTime);
+
+ Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
+ }
+
+ rt.doneReprioritization(reproID);
+ }
}
/** Start the agent. This method should spin up the agent threads, and
@@ -277,14 +327,14 @@ public class CrawlerAgent implements IAg
docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue,processID);
jobStartThread = new JobStartThread(processID);
- startupThread = new StartupThread(queueTracker,new StartupResetManager(processID),processID);
+ startupThread = new StartupThread(new StartupResetManager(processID),processID);
startDeleteThread = new StartDeleteThread(new DeleteStartupResetManager(processID),processID);
finisherThread = new FinisherThread(processID);
notificationThread = new JobNotificationThread(new NotificationResetManager(processID),processID);
jobDeleteThread = new JobDeleteThread(processID);
stufferThread = new StufferThread(documentQueue,numWorkerThreads,workerResetManager,queueTracker,blockingDocuments,lowWaterFactor,stuffAmtFactor,processID);
expireStufferThread = new ExpireStufferThread(expireQueue,numExpireThreads,workerResetManager,processID);
- setPriorityThread = new SetPriorityThread(queueTracker,numWorkerThreads,blockingDocuments,processID);
+ setPriorityThread = new SetPriorityThread(numWorkerThreads,blockingDocuments,processID);
historyCleanupThread = new HistoryCleanupThread(processID);
workerThreads = new WorkerThread[numWorkerThreads];
@@ -299,7 +349,7 @@ public class CrawlerAgent implements IAg
i = 0;
while (i < numExpireThreads)
{
- expireThreads[i] = new ExpireThread(Integer.toString(i),expireQueue,queueTracker,workerResetManager,processID);
+ expireThreads[i] = new ExpireThread(Integer.toString(i),expireQueue,workerResetManager,processID);
i++;
}
@@ -317,143 +367,61 @@ public class CrawlerAgent implements IAg
i = 0;
while (i < numCleanupThreads)
{
- cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,queueTracker,docCleanupResetManager,processID);
+ cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,docCleanupResetManager,processID);
i++;
}
- jobResetThread = new JobResetThread(queueTracker,processID);
- seedingThread = new SeedingThread(queueTracker,new SeedingResetManager(processID),processID);
+ jobResetThread = new JobResetThread(processID);
+ seedingThread = new SeedingThread(new SeedingResetManager(processID),processID);
idleCleanupThread = new IdleCleanupThread(processID);
- initializationThread = new InitializationThread(queueTracker);
- // Start the initialization thread. This does the initialization work and starts all the other threads when that's done. It then exits.
- initializationThread.start();
- Logging.root.info("Pull-agent started");
- }
+ // Start all the threads
+ jobStartThread.start();
+ startupThread.start();
+ startDeleteThread.start();
+ finisherThread.start();
+ notificationThread.start();
+ jobDeleteThread.start();
+ stufferThread.start();
+ expireStufferThread.start();
+ setPriorityThread.start();
+ historyCleanupThread.start();
- protected class InitializationThread extends Thread
- {
-
- protected final QueueTracker queueTracker;
-
- public InitializationThread(QueueTracker queueTracker)
+ i = 0;
+ while (i < numWorkerThreads)
{
- super();
- this.queueTracker = queueTracker;
- setName("Initialization thread");
- setDaemon(true);
+ workerThreads[i].start();
+ i++;
}
- public void run()
+ i = 0;
+ while (i < numExpireThreads)
{
- int i;
-
- try
- {
- IThreadContext threadContext = ThreadContextFactory.make();
-
- // First, get a job manager
- IJobManager jobManager = JobManagerFactory.make(threadContext);
- IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
-
- /* No longer needed, because IAgents specifically initializes/cleans up.
-
- Logging.threads.debug("Agents process starting initialization...");
-
- // Call the database to get it ready
- jobManager.prepareForStart();
- */
-
- Logging.threads.debug("Agents process reprioritizing documents...");
-
- Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
- Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
- // Reprioritize all documents in the jobqueue, 1000 at a time
- long currentTime = System.currentTimeMillis();
-
- // Do the 'not yet processed' documents only. Documents that are queued for reprocessing will be assigned
- // new priorities. Already processed documents won't. This guarantees that our bins are appropriate for current thread
- // activity.
- // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
- // priorities.
- while (true)
- {
- long startTime = System.currentTimeMillis();
-
- DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime, 10000);
- if (docs.length == 0)
- break;
-
- // Calculate new priorities for all these documents
- ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,docs,connectionMap,jobDescriptionMap,
- queueTracker,currentTime);
-
- Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
- }
-
- Logging.threads.debug("Agents process initialization complete!");
-
- // Start all the threads
- jobStartThread.start();
- startupThread.start();
- startDeleteThread.start();
- finisherThread.start();
- notificationThread.start();
- jobDeleteThread.start();
- stufferThread.start();
- expireStufferThread.start();
- setPriorityThread.start();
- historyCleanupThread.start();
-
- i = 0;
- while (i < numWorkerThreads)
- {
- workerThreads[i].start();
- i++;
- }
+ expireThreads[i].start();
+ i++;
+ }
- i = 0;
- while (i < numExpireThreads)
- {
- expireThreads[i].start();
- i++;
- }
+ cleanupStufferThread.start();
+ i = 0;
+ while (i < numCleanupThreads)
+ {
+ cleanupThreads[i].start();
+ i++;
+ }
- cleanupStufferThread.start();
- i = 0;
- while (i < numCleanupThreads)
- {
- cleanupThreads[i].start();
- i++;
- }
+ deleteStufferThread.start();
+ i = 0;
+ while (i < numDeleteThreads)
+ {
+ deleteThreads[i].start();
+ i++;
+ }
- deleteStufferThread.start();
- i = 0;
- while (i < numDeleteThreads)
- {
- deleteThreads[i].start();
- i++;
- }
+ jobResetThread.start();
+ seedingThread.start();
+ idleCleanupThread.start();
- jobResetThread.start();
- seedingThread.start();
- idleCleanupThread.start();
- // exit!
- }
- catch (Throwable e)
- {
- // Severe error on initialization
- if (e instanceof ManifoldCFException)
- {
- // Deal with interrupted exception gracefully, because it means somebody is trying to shut us down before we got started.
- if (((ManifoldCFException)e).getErrorCode() == ManifoldCFException.INTERRUPTED)
- return;
- }
- System.err.println("agents process could not start - shutting down");
- Logging.threads.fatal("Startup initialization error tossed: "+e.getMessage(),e);
- System.exit(-300);
- }
- }
+ Logging.root.info("Pull-agent started");
}
/** Stop the system.
@@ -462,7 +430,7 @@ public class CrawlerAgent implements IAg
throws ManifoldCFException
{
Logging.root.info("Shutting down pull-agent...");
- while (initializationThread != null || jobDeleteThread != null || startupThread != null || startDeleteThread != null ||
+ while (jobDeleteThread != null || startupThread != null || startDeleteThread != null ||
jobStartThread != null || stufferThread != null ||
finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null || expireThreads != null ||
deleteStufferThread != null || deleteThreads != null ||
@@ -472,10 +440,6 @@ public class CrawlerAgent implements IAg
// Send an interrupt to all threads that are still there.
// In theory, this only needs to be done once. In practice, I have seen cases where the thread loses track of the fact that it has been
// interrupted (which may be a JVM bug - who knows?), but in any case there's no harm in doing it again.
- if (initializationThread != null)
- {
- initializationThread.interrupt();
- }
if (historyCleanupThread != null)
{
historyCleanupThread.interrupt();
@@ -587,11 +551,6 @@ public class CrawlerAgent implements IAg
}
// Check to see which died.
- if (initializationThread != null)
- {
- if (!initializationThread.isAlive())
- initializationThread = null;
- }
if (historyCleanupThread != null)
{
if (!historyCleanupThread.isAlive())
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Mon Dec 2 11:27:35 2013
@@ -50,8 +50,6 @@ public class DocumentCleanupThread exten
protected final DocumentCleanupQueue documentCleanupQueue;
/** Delete thread pool reset manager */
protected final DocCleanupResetManager resetManager;
- /** Queue tracker */
- protected final QueueTracker queueTracker;
/** Process ID */
protected final String processID;
@@ -59,13 +57,12 @@ public class DocumentCleanupThread exten
*@param id is the worker thread id.
*/
public DocumentCleanupThread(String id, DocumentCleanupQueue documentCleanupQueue,
- QueueTracker queueTracker, DocCleanupResetManager resetManager, String processID)
+ DocCleanupResetManager resetManager, String processID)
throws ManifoldCFException
{
super();
this.id = id;
this.documentCleanupQueue = documentCleanupQueue;
- this.queueTracker = queueTracker;
this.resetManager = resetManager;
this.processID = processID;
setName("Document cleanup thread '"+id+"'");
@@ -83,6 +80,7 @@ public class DocumentCleanupThread exten
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+ ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
// Loop
while (true)
@@ -239,7 +237,7 @@ public class DocumentCleanupThread exten
DocumentDescription[] requeueCandidates = jobManager.markDocumentCleanedUp(jobID,legalLinkTypes,ddd,hopcountMethod);
// Use the common method for doing the requeuing
ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
- connector,connection,queueTracker,currentTime);
+ connector,connection,rt,currentTime);
// Finally, completed expiration of the document.
dqd.setProcessed();
}
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Mon Dec 2 11:27:35 2013
@@ -39,22 +39,19 @@ public class ExpireThread extends Thread
protected final DocumentCleanupQueue documentQueue;
/** Worker thread pool reset manager */
protected final WorkerResetManager resetManager;
- /** Queue tracker */
- protected final QueueTracker queueTracker;
/** Process ID */
protected final String processID;
/** Constructor.
*@param id is the expire thread id.
*/
- public ExpireThread(String id, DocumentCleanupQueue documentQueue, QueueTracker queueTracker, WorkerResetManager resetManager, String processID)
+ public ExpireThread(String id, DocumentCleanupQueue documentQueue, WorkerResetManager resetManager, String processID)
throws ManifoldCFException
{
super();
this.id = id;
this.documentQueue = documentQueue;
this.resetManager = resetManager;
- this.queueTracker = queueTracker;
this.processID = processID;
setName("Expiration thread '"+id+"'");
setDaemon(true);
@@ -73,6 +70,7 @@ public class ExpireThread extends Thread
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+ ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
// Loop
while (true)
@@ -242,7 +240,7 @@ public class ExpireThread extends Thread
DocumentDescription[] requeueCandidates = jobManager.markDocumentExpired(jobID,legalLinkTypes,ddd,hopcountMethod);
// Use the common method for doing the requeuing
ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
- connector,connection,queueTracker,currentTime);
+ connector,connection,rt,currentTime);
// Finally, completed expiration of the document.
dqd.setProcessed();
}
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java Mon Dec 2 11:27:35 2013
@@ -33,20 +33,17 @@ public class JobResetThread extends Thre
public static final String _rcsid = "@(#)$Id: JobResetThread.java 991295 2010-08-31 19:12:14Z kwright $";
// Local data
- /** Queue tracker */
- protected final QueueTracker queueTracker;
/** Process ID */
protected final String processID;
/** Constructor.
*/
- public JobResetThread(QueueTracker queueTracker, String processID)
+ public JobResetThread(String processID)
throws ManifoldCFException
{
super();
setName("Job reset thread");
setDaemon(true);
- this.queueTracker = queueTracker;
this.processID = processID;
}
@@ -109,7 +106,7 @@ public class JobResetThread extends Thre
{
Logging.threads.debug("Job reset thread reprioritizing documents...");
- ManifoldCF.resetAllDocumentPriorities(threadContext,queueTracker,currentTime);
+ ManifoldCF.resetAllDocumentPriorities(threadContext,currentTime,processID);
Logging.threads.debug("Job reset thread done reprioritizing documents.");
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Mon Dec 2 11:27:35 2013
@@ -519,10 +519,12 @@ public class ManifoldCF extends org.apac
IConnectorManager repConnMgr = ConnectorManagerFactory.make(threadcontext);
IRepositoryConnectionManager repCon = RepositoryConnectionManagerFactory.make(threadcontext);
IJobManager jobManager = JobManagerFactory.make(threadcontext);
+ IBinManager binManager = BinManagerFactory.make(threadcontext);
org.apache.manifoldcf.authorities.system.ManifoldCF.installSystemTables(threadcontext);
repConnMgr.install();
repCon.install();
jobManager.install();
+ binManager.install();
}
/** Uninstall all the crawler system tables.
@@ -534,6 +536,8 @@ public class ManifoldCF extends org.apac
IConnectorManager repConnMgr = ConnectorManagerFactory.make(threadcontext);
IRepositoryConnectionManager repCon = RepositoryConnectionManagerFactory.make(threadcontext);
IJobManager jobManager = JobManagerFactory.make(threadcontext);
+ IBinManager binManager = BinManagerFactory.make(threadcontext);
+ binManager.deinstall();
jobManager.deinstall();
repCon.deinstall();
repConnMgr.deinstall();
@@ -839,13 +843,14 @@ public class ManifoldCF extends org.apac
/** Requeue documents due to carrydown.
*/
- public static void requeueDocumentsDueToCarrydown(IJobManager jobManager, DocumentDescription[] requeueCandidates,
- IRepositoryConnector connector, IRepositoryConnection connection, QueueTracker queueTracker, long currentTime)
+ public static void requeueDocumentsDueToCarrydown(IJobManager jobManager,
+ DocumentDescription[] requeueCandidates,
+ IRepositoryConnector connector, IRepositoryConnection connection, ReprioritizationTracker rt, long currentTime)
throws ManifoldCFException
{
// A list of document descriptions from finishDocuments() above represents those documents that may need to be requeued, for the
// reason that carrydown information for those documents has changed. In order to requeue, we need to calculate document priorities, however.
- double[] docPriorities = new double[requeueCandidates.length];
+ IPriorityCalculator[] docPriorities = new IPriorityCalculator[requeueCandidates.length];
String[][] binNames = new String[requeueCandidates.length][];
int q = 0;
while (q < requeueCandidates.length)
@@ -853,27 +858,12 @@ public class ManifoldCF extends org.apac
DocumentDescription dd = requeueCandidates[q];
String[] bins = calculateBins(connector,dd.getDocumentIdentifier());
binNames[q] = bins;
- docPriorities[q] = queueTracker.calculatePriority(bins,connection);
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Document '"+dd.getDocumentIdentifier()+" given priority "+new Double(docPriorities[q]).toString());
+ docPriorities[q] = new PriorityCalculator(rt,connection,bins);
q++;
}
// Now, requeue the documents with the new priorities
- boolean[] trackerNote = jobManager.carrydownChangeDocumentMultiple(requeueCandidates,currentTime,docPriorities);
-
- // Free the unused priorities.
- // Inform queuetracker about what we used and what we didn't
- q = 0;
- while (q < trackerNote.length)
- {
- if (trackerNote[q] == false)
- {
- String[] bins = binNames[q];
- queueTracker.notePriorityNotUsed(bins,connection,docPriorities[q]);
- }
- q++;
- }
+ jobManager.carrydownChangeDocumentMultiple(requeueCandidates,currentTime,docPriorities);
}
/** Stuff colons so we can't have conflicts. */
@@ -917,64 +907,54 @@ public class ManifoldCF extends org.apac
return connector.getBinNames(documentIdentifier);
}
- protected final static String resetDocPrioritiesLock = "_RESETPRIORITIES_";
-
/** Reset all (active) document priorities. This operation may occur due to various externally-triggered
* events, such a job abort, pause, resume, wait, or unwait.
*/
- public static void resetAllDocumentPriorities(IThreadContext threadContext, QueueTracker queueTracker, long currentTime)
+ public static void resetAllDocumentPriorities(IThreadContext threadContext, long currentTime, String processID)
throws ManifoldCFException
{
ILockManager lockManager = LockManagerFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
IRepositoryConnectionManager connectionManager = RepositoryConnectionManagerFactory.make(threadContext);
-
- // Only one thread allowed at a time
- lockManager.enterWriteLock(resetDocPrioritiesLock);
- try
- {
- // Reset the queue tracker
- queueTracker.beginReset();
- // Perform the reprioritization, for all active documents in active jobs. During this time,
- // it is safe to have other threads assign new priorities to documents, but it is NOT safe
- // for other threads to attempt to change the minimum priority level. The queuetracker object
- // will therefore block that from occurring, until the reset is complete.
- try
- {
- // Reprioritize all documents in the jobqueue, 1000 at a time
+ ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
- Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
- Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+ String reproID = IDFactory.make(threadContext);
- // Do the 'not yet processed' documents only. Documents that are queued for reprocessing will be assigned
- // new priorities. Already processed documents won't. This guarantees that our bins are appropriate for current thread
- // activity.
- // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
- // priorities.
- while (true)
- {
- long startTime = System.currentTimeMillis();
+ rt.startReprioritization(System.currentTimeMillis(),processID,reproID);
+ // Reprioritize all documents in the jobqueue, 1000 at a time
- DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime, 10000);
- if (docs.length == 0)
- break;
-
- // Calculate new priorities for all these documents
- writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,
- queueTracker,currentTime);
+ Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
+ Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+
+ // Do the 'not yet processed' documents only. Documents that are queued for reprocessing will be assigned
+ // new priorities. Already processed documents won't. This guarantees that our bins are appropriate for current thread
+ // activity.
+ // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
+ // priorities.
+ while (true)
+ {
+ long startTime = System.currentTimeMillis();
- Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
- }
- }
- finally
+ Long currentTimeValue = rt.checkReprioritizationInProgress();
+ if (currentTimeValue == null)
{
- queueTracker.endReset();
+ // Some other process or thread superceded us.
+ return;
}
+ long updateTime = currentTimeValue.longValue();
+
+ DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000);
+ if (docs.length == 0)
+ break;
+
+ // Calculate new priorities for all these documents
+ writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,
+ rt,updateTime);
+
+ Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
}
- finally
- {
- lockManager.leaveWriteLock(resetDocPrioritiesLock);
- }
+
+ rt.doneReprioritization(reproID);
}
/** Write a set of document priorities, based on the current queue tracker.
@@ -982,14 +962,14 @@ public class ManifoldCF extends org.apac
public static void writeDocumentPriorities(IThreadContext threadContext, IRepositoryConnectionManager mgr,
IJobManager jobManager, DocumentDescription[] descs,
Map<String,IRepositoryConnection> connectionMap, Map<Long,IJobDescription> jobDescriptionMap,
- QueueTracker queueTracker, long currentTime)
+ ReprioritizationTracker rt, long currentTime)
throws ManifoldCFException
{
if (Logging.scheduling.isDebugEnabled())
Logging.scheduling.debug("Reprioritizing "+Integer.toString(descs.length)+" documents");
- double[] priorities = new double[descs.length];
+ IPriorityCalculator[] priorities = new IPriorityCalculator[descs.length];
// Go through the documents and calculate the priorities
int i = 0;
@@ -1029,9 +1009,7 @@ public class ManifoldCF extends org.apac
RepositoryConnectorFactory.release(connector);
}
- priorities[i] = queueTracker.calculatePriority(binNames,connection);
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Document '"+dd.getDocumentIdentifier()+"' given priority "+new Double(priorities[i]).toString());
+ priorities[i] = new PriorityCalculator(rt,connection,binNames);
i++;
}
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java Mon Dec 2 11:27:35 2013
@@ -40,7 +40,7 @@ public class SeedingActivity implements
protected final String connectionName;
protected final IRepositoryConnectionManager connManager;
protected final IJobManager jobManager;
- protected final QueueTracker queueTracker;
+ protected final ReprioritizationTracker rt;
protected final IRepositoryConnection connection;
protected final IRepositoryConnector connector;
protected final Long jobID;
@@ -57,15 +57,16 @@ public class SeedingActivity implements
/** Constructor.
*/
- public SeedingActivity(String connectionName, IRepositoryConnectionManager connManager, IJobManager jobManager,
- QueueTracker queueTracker, IRepositoryConnection connection, IRepositoryConnector connector,
+ public SeedingActivity(String connectionName, IRepositoryConnectionManager connManager,
+ IJobManager jobManager,
+ ReprioritizationTracker rt, IRepositoryConnection connection, IRepositoryConnector connector,
Long jobID, String[] legalLinkTypes, boolean overrideSchedule, int hopcountMethod, String processID)
{
this.processID = processID;
this.connectionName = connectionName;
this.connManager = connManager;
this.jobManager = jobManager;
- this.queueTracker = queueTracker;
+ this.rt = rt;
this.connection = connection;
this.connector = connector;
this.jobID = jobID;
@@ -215,39 +216,22 @@ public class SeedingActivity implements
{
// First, prioritize the documents using the queue tracker
long prioritizationTime = System.currentTimeMillis();
- double[] docPriorities = new double[docIDHashes.length];
- String[][] binNames = new String[docIDHashes.length][];
+ IPriorityCalculator[] docPriorities = new IPriorityCalculator[docIDHashes.length];
int i = 0;
while (i < docIDHashes.length)
{
// Calculate desired document priority based on current queuetracker status.
String[] bins = connector.getBinNames(docIDs[i]);
-
- binNames[i] = bins;
- docPriorities[i] = queueTracker.calculatePriority(bins,connection);
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Giving document '"+docIDs[i]+"' priority "+new Double(docPriorities[i]).toString());
+ docPriorities[i] = new PriorityCalculator(rt,connection,bins);
i++;
}
- boolean[] trackerNote = jobManager.addDocumentsInitial(processID,
+ jobManager.addDocumentsInitial(processID,
jobID,legalLinkTypes,docIDHashes,docIDs,overrideSchedule,hopcountMethod,
prioritizationTime,docPriorities,prereqEventNames);
- // Inform queuetracker about what we used and what we didn't
- int j = 0;
- while (j < trackerNote.length)
- {
- if (trackerNote[j] == false)
- {
- String[] bins = binNames[j];
- queueTracker.notePriorityNotUsed(bins,connection,docPriorities[j]);
- }
- j++;
- }
-
}
/** Check whether current job is still active.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java Mon Dec 2 11:27:35 2013
@@ -38,8 +38,6 @@ public class SeedingThread extends Threa
// Local data
/** Seeding reset manager */
protected final SeedingResetManager resetManager;
- /** Queue tracker */
- protected final QueueTracker queueTracker;
/** Process ID */
protected final String processID;
@@ -48,14 +46,13 @@ public class SeedingThread extends Threa
/** Constructor.
*/
- public SeedingThread(QueueTracker queueTracker, SeedingResetManager resetManager, String processID)
+ public SeedingThread(SeedingResetManager resetManager, String processID)
throws ManifoldCFException
{
super();
setName("Seeding thread");
setDaemon(true);
this.resetManager = resetManager;
- this.queueTracker = queueTracker;
this.processID = processID;
}
@@ -69,6 +66,7 @@ public class SeedingThread extends Threa
IThreadContext threadContext = ThreadContextFactory.make();
IJobManager jobManager = JobManagerFactory.make(threadContext);
IRepositoryConnectionManager connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+ ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
IDBInterface database = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
@@ -147,7 +145,8 @@ public class SeedingThread extends Threa
try
{
- SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,jobManager,queueTracker,
+ SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,
+ jobManager,rt,
connection,connector,jobID,legalLinkTypes,false,hopcountMethod,processID);
if (Logging.threads.isDebugEnabled())
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java Mon Dec 2 11:27:35 2013
@@ -38,8 +38,6 @@ public class SetPriorityThread extends T
public static final String _rcsid = "@(#)$Id: SetPriorityThread.java 988245 2010-08-23 18:39:35Z kwright $";
// Local data
- /** This is the queue tracker object. */
- protected final QueueTracker queueTracker;
/** This is the number of documents per cycle */
protected final int cycleCount;
/** The blocking documents object */
@@ -48,13 +46,11 @@ public class SetPriorityThread extends T
protected final String processID;
/** Constructor.
- *@param qt is the queue tracker object.
*/
- public SetPriorityThread(QueueTracker qt, int workerThreadCount, BlockingDocuments blockingDocuments, String processID)
+ public SetPriorityThread(int workerThreadCount, BlockingDocuments blockingDocuments, String processID)
throws ManifoldCFException
{
super();
- this.queueTracker = qt;
this.blockingDocuments = blockingDocuments;
this.processID = processID;
cycleCount = workerThreadCount * 10;
@@ -72,7 +68,8 @@ public class SetPriorityThread extends T
IThreadContext threadContext = ThreadContextFactory.make();
IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
-
+ ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
+
Logging.threads.debug("Set priority thread coming up");
// Job description map (local) - designed to improve performance.
@@ -129,7 +126,8 @@ public class SetPriorityThread extends T
DocumentDescription desc = blockingDocuments.getBlockingDocument();
if (desc != null)
{
- ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,queueTracker,currentTime);
+ ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,
+ new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,rt,currentTime);
processedCount++;
continue;
}
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java Mon Dec 2 11:27:35 2013
@@ -33,8 +33,6 @@ public class StartupThread extends Threa
public static final String _rcsid = "@(#)$Id: StartupThread.java 988245 2010-08-23 18:39:35Z kwright $";
// Local data
- /** Queue tracker */
- protected final QueueTracker queueTracker;
/** Process ID */
protected final String processID;
/** Reset manager */
@@ -42,13 +40,12 @@ public class StartupThread extends Threa
/** Constructor.
*/
- public StartupThread(QueueTracker queueTracker, StartupResetManager resetManager, String processID)
+ public StartupThread(StartupResetManager resetManager, String processID)
throws ManifoldCFException
{
super();
setName("Startup thread");
setDaemon(true);
- this.queueTracker = queueTracker;
this.resetManager = resetManager;
this.processID = processID;
}
@@ -63,6 +60,7 @@ public class StartupThread extends Threa
IThreadContext threadContext = ThreadContextFactory.make();
IJobManager jobManager = JobManagerFactory.make(threadContext);
IRepositoryConnectionManager connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+ ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
IDBInterface database = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
@@ -151,7 +149,8 @@ public class StartupThread extends Threa
try
{
- SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,jobManager,queueTracker,
+ SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,
+ jobManager,rt,
connection,connector,jobID,legalLinkTypes,true,hopcountMethod,processID);
if (Logging.threads.isDebugEnabled())
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java Mon Dec 2 11:27:35 2013
@@ -87,6 +87,7 @@ public class StufferThread extends Threa
IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
+ ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
Logging.threads.debug("Stuffer thread: Low water mark is "+Integer.toString(lowWaterMark)+"; amount per stuffing is "+Integer.toString(stuffAmt));
@@ -166,7 +167,7 @@ public class StufferThread extends Threa
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
- queueTracker.assessMinimumDepth(depthStatistics.getBins());
+ rt.assessMinimumDepth(depthStatistics.getBins());
// Set the last time to be the current time
lastTime = currentTime;
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Mon Dec 2 11:27:35 2013
@@ -74,8 +74,10 @@ public class WorkerThread extends Thread
IThreadContext threadContext = ThreadContextFactory.make();
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
+ IBinManager binManager = BinManagerFactory.make(threadContext);
IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
IOutputConnectionManager outputMgr = OutputConnectionManagerFactory.make(threadContext);
+ ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
List<DocumentToProcess> fetchList = new ArrayList<DocumentToProcess>();
Map<String,String> versionMap = new HashMap<String,String>();
@@ -527,7 +529,7 @@ public class WorkerThread extends Thread
// First, make the things we will need for all subsequent steps.
ProcessActivity activity = new ProcessActivity(processID,
- threadContext,queueTracker,jobManager,ingester,
+ threadContext,rt,jobManager,ingester,
currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion,newParameterVersion);
try
{
@@ -569,7 +571,8 @@ public class WorkerThread extends Thread
// "Finish" the documents (removing unneeded carrydown info, etc.)
DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode());
- ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+ ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+ requeueCandidates,connector,connection,rt,currentTime);
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents");
@@ -819,12 +822,12 @@ public class WorkerThread extends Thread
// Now, handle the delete list
processDeleteLists(outputName,connector,connection,jobManager,
deleteList,ingester,
- job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+ job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
// Handle hopcount removal
processHopcountRemovalLists(outputName,connector,connection,jobManager,
hopcountremoveList,ingester,
- job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+ job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
}
finally
@@ -975,17 +978,18 @@ public class WorkerThread extends Thread
* documents from the index should they be already present.
*/
protected static void processHopcountRemovalLists(String outputName, IRepositoryConnector connector,
- IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> hopcountremoveList,
+ IRepositoryConnection connection, IJobManager jobManager,
+ List<QueuedDocument> hopcountremoveList,
IIncrementalIngester ingester,
Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
- int hopcountMethod, QueueTracker queueTracker, long currentTime)
+ int hopcountMethod, ReprioritizationTracker rt, long currentTime)
throws ManifoldCFException
{
// Remove from index
hopcountremoveList = removeFromIndex(outputName,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
// Mark as 'hopcountremoved' in the job queue
processJobQueueHopcountRemovals(hopcountremoveList,connector,connection,
- jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+ jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
}
/** Clear specified documents out of the job queue and from the appliance.
@@ -996,17 +1000,18 @@ public class WorkerThread extends Thread
*@param ingesterDeleteList is a list of document id's to delete.
*/
protected static void processDeleteLists(String outputName, IRepositoryConnector connector,
- IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> deleteList,
+ IRepositoryConnection connection, IJobManager jobManager,
+ List<QueuedDocument> deleteList,
IIncrementalIngester ingester,
Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
- int hopcountMethod, QueueTracker queueTracker, long currentTime)
+ int hopcountMethod, ReprioritizationTracker rt, long currentTime)
throws ManifoldCFException
{
// Remove from index
deleteList = removeFromIndex(outputName,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
// Delete from the job queue
processJobQueueDeletions(deleteList,connector,connection,
- jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+ jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
}
/** Remove a specified set of documents from the index.
@@ -1086,7 +1091,7 @@ public class WorkerThread extends Thread
*/
protected static void processJobQueueDeletions(List<QueuedDocument> jobmanagerDeleteList,
IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
- Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+ Long jobID, String[] legalLinkTypes, int hopcountMethod, ReprioritizationTracker rt, long currentTime)
throws ManifoldCFException
{
// Now, do the document queue cleanup for deletions.
@@ -1103,7 +1108,8 @@ public class WorkerThread extends Thread
DocumentDescription[] requeueCandidates = jobManager.markDocumentDeletedMultiple(jobID,legalLinkTypes,deleteDescriptions,hopcountMethod);
// Requeue those documents that had carrydown data modifications
- ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+ ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+ requeueCandidates,connector,connection,rt,currentTime);
// Mark all these as done
for (int i = 0; i < jobmanagerDeleteList.size(); i++)
@@ -1118,7 +1124,7 @@ public class WorkerThread extends Thread
*/
protected static void processJobQueueHopcountRemovals(List<QueuedDocument> jobmanagerRemovalList,
IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
- Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+ Long jobID, String[] legalLinkTypes, int hopcountMethod, ReprioritizationTracker rt, long currentTime)
throws ManifoldCFException
{
// Now, do the document queue cleanup for deletions.
@@ -1135,7 +1141,8 @@ public class WorkerThread extends Thread
DocumentDescription[] requeueCandidates = jobManager.markDocumentHopcountRemovalMultiple(jobID,legalLinkTypes,removalDescriptions,hopcountMethod);
// Requeue those documents that had carrydown data modifications
- ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+ ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+ requeueCandidates,connector,connection,rt,currentTime);
// Mark all these as done
for (int i = 0; i < jobmanagerRemovalList.size(); i++)
@@ -1440,7 +1447,7 @@ public class WorkerThread extends Thread
protected final IRepositoryConnectionManager connMgr;
protected final String[] legalLinkTypes;
protected final OutputActivity ingestLogger;
- protected final QueueTracker queueTracker;
+ protected final ReprioritizationTracker rt;
protected final HashMap abortSet;
protected final String outputVersion;
protected final String parameterVersion;
@@ -1461,13 +1468,16 @@ public class WorkerThread extends Thread
*@param jobManager is the job manager
*@param ingester is the ingester
*/
- public ProcessActivity(String processID, IThreadContext threadContext, QueueTracker queueTracker, IJobManager jobManager, IIncrementalIngester ingester,
- long currentTime, IJobDescription job, IRepositoryConnection connection, IRepositoryConnector connector, IRepositoryConnectionManager connMgr,
- String[] legalLinkTypes, OutputActivity ingestLogger, HashMap abortSet, String outputVersion, String parameterVersion)
+ public ProcessActivity(String processID, IThreadContext threadContext,
+ ReprioritizationTracker rt, IJobManager jobManager,
+ IIncrementalIngester ingester, long currentTime,
+ IJobDescription job, IRepositoryConnection connection, IRepositoryConnector connector,
+ IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger,
+ HashMap abortSet, String outputVersion, String parameterVersion)
{
this.processID = processID;
this.threadContext = threadContext;
- this.queueTracker = queueTracker;
+ this.rt = rt;
this.jobManager = jobManager;
this.ingester = ingester;
this.currentTime = currentTime;
@@ -1989,16 +1999,15 @@ public class WorkerThread extends Thread
String[] docidHashes = new String[set.size()];
String[] docids = new String[set.size()];
- double[] priorities = new double[set.size()];
- String[][] binNames = new String[set.size()][];
+ IPriorityCalculator[] priorities = new IPriorityCalculator[set.size()];
String[][] dataNames = new String[docids.length][];
Object[][][] dataValues = new Object[docids.length][][];
String[][] eventNames = new String[docids.length][];
long currentTime = System.currentTimeMillis();
- int j = 0;
- while (j < docidHashes.length)
+ rt.clearPreloadRequests();
+ for (int j = 0; j < docidHashes.length; j++)
{
DocumentReference dr = (DocumentReference)set.get(j);
docidHashes[j] = dr.getLocalIdentifierHash();
@@ -2009,35 +2018,17 @@ public class WorkerThread extends Thread
// Calculate desired document priority based on current queuetracker status.
String[] bins = ManifoldCF.calculateBins(connector,dr.getLocalIdentifier());
-
-
- binNames[j] = bins;
- priorities[j] = queueTracker.calculatePriority(bins,connection);
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Assigning '"+docids[j]+"' priority "+new Double(priorities[j]).toString());
-
- // No longer used; the functionality is folded atomically into calculatePriority above:
- //queueTracker.notePrioritySet(currentTime,job.getID(),bins,connection);
-
- j++;
+ PriorityCalculator p = new PriorityCalculator(rt,connection,bins);
+ priorities[j] = p;
+ p.makePreloadRequest();
}
+ rt.preloadBinValues();
- boolean[] trackerNote = jobManager.addDocuments(processID,
+ jobManager.addDocuments(processID,
job.getID(),legalLinkTypes,docidHashes,docids,db.getParentIdentifierHash(),db.getLinkType(),job.getHopcountMode(),
dataNames,dataValues,currentTime,priorities,eventNames);
-
- // Inform queuetracker about what we used and what we didn't
- j = 0;
- while (j < trackerNote.length)
- {
- if (trackerNote[j] == false)
- {
- String[] bins = binNames[j];
- queueTracker.notePriorityNotUsed(bins,connection,priorities[j]);
- }
- j++;
- }
-
+
+ rt.clearPreloadedValues();
}
discard();
Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java Mon Dec 2 11:27:35 2013
@@ -100,6 +100,7 @@ public class BaseITDerby extends Connect
/** Construct a command url.
*/
protected String makeAPIURL(String command)
+ throws Exception
{
return mcfInstance.makeAPIURL(command);
}
Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java Mon Dec 2 11:27:35 2013
@@ -44,6 +44,12 @@ public class BaseITHSQLDB extends Connec
mcfInstance = new ManifoldCFInstance(singleWar);
}
+ public BaseITHSQLDB(boolean singleWar, boolean webapps)
+ {
+ super();
+ mcfInstance = new ManifoldCFInstance(singleWar, webapps);
+ }
+
// Basic job support
protected void waitJobInactiveNative(IJobManager jobManager, Long jobID, long maxTime)
@@ -101,6 +107,7 @@ public class BaseITHSQLDB extends Connec
/** Construct a command url.
*/
protected String makeAPIURL(String command)
+ throws Exception
{
return mcfInstance.makeAPIURL(command);
}
Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java Mon Dec 2 11:27:35 2013
@@ -101,6 +101,7 @@ public class BaseITMySQL extends Connect
/** Construct a command url.
*/
protected String makeAPIURL(String command)
+ throws Exception
{
return mcfInstance.makeAPIURL(command);
}
Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java Mon Dec 2 11:27:35 2013
@@ -101,6 +101,7 @@ public class BaseITPostgresql extends Co
/** Construct a command url.
*/
protected String makeAPIURL(String command)
+ throws Exception
{
return mcfInstance.makeAPIURL(command);
}
Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java Mon Dec 2 11:27:35 2013
@@ -56,6 +56,7 @@ import org.apache.http.entity.ContentTyp
/** Tests that run the "agents daemon" should be derived from this */
public class ManifoldCFInstance
{
+ protected boolean webapps = true;
protected boolean singleWar = false;
protected int testPort = 8346;
@@ -68,16 +69,27 @@ public class ManifoldCFInstance
public ManifoldCFInstance(boolean singleWar)
{
- this(8346,singleWar);
+ this(8346,singleWar,true);
+ }
+
+ public ManifoldCFInstance(boolean singleWar, boolean webapps)
+ {
+ this(8346,singleWar,webapps);
}
public ManifoldCFInstance(int testPort)
{
- this(testPort,false);
+ this(testPort,false,true);
}
public ManifoldCFInstance(int testPort, boolean singleWar)
{
+ this(testPort,singleWar,true);
+ }
+
+ public ManifoldCFInstance(int testPort, boolean singleWar, boolean webapps)
+ {
+ this.webapps = webapps;
this.testPort = testPort;
this.singleWar = singleWar;
}
@@ -277,11 +289,17 @@ public class ManifoldCFInstance
/** Construct a command url.
*/
public String makeAPIURL(String command)
+ throws Exception
{
- if (singleWar)
- return "http://localhost:"+Integer.toString(testPort)+"/mcf/api/json/"+command;
+ if (webapps)
+ {
+ if (singleWar)
+ return "http://localhost:"+Integer.toString(testPort)+"/mcf/api/json/"+command;
+ else
+ return "http://localhost:"+Integer.toString(testPort)+"/mcf-api-service/json/"+command;
+ }
else
- return "http://localhost:"+Integer.toString(testPort)+"/mcf-api-service/json/"+command;
+ throw new Exception("No API servlet running");
}
public static String convertToString(HttpResponse httpResponse)
@@ -495,53 +513,64 @@ public class ManifoldCFInstance
public void start()
throws Exception
{
- // Start jetty
- server = new Server( testPort );
- server.setStopAtShutdown( true );
- // Initialize the servlets
ContextHandlerCollection contexts = new ContextHandlerCollection();
- server.setHandler(contexts);
+ if (webapps)
+ {
+ // Start jetty
+ server = new Server( testPort );
+ server.setStopAtShutdown( true );
+ // Initialize the servlets
+ server.setHandler(contexts);
+ }
if (singleWar)
{
- // Start the single combined war
- String combinedWarPath = "../../framework/build/war-proprietary/mcf-combined-service.war";
- if (System.getProperty("combinedWarPath") != null)
- combinedWarPath = System.getProperty("combinedWarPath");
-
- // Initialize the servlet
- WebAppContext lcfCombined = new WebAppContext(combinedWarPath,"/mcf");
- // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
- lcfCombined.setParentLoaderPriority(true);
- contexts.addHandler(lcfCombined);
- server.start();
+ if (webapps)
+ {
+ // Start the single combined war
+ String combinedWarPath = "../../framework/build/war-proprietary/mcf-combined-service.war";
+ if (System.getProperty("combinedWarPath") != null)
+ combinedWarPath = System.getProperty("combinedWarPath");
+
+ // Initialize the servlet
+ WebAppContext lcfCombined = new WebAppContext(combinedWarPath,"/mcf");
+ // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
+ lcfCombined.setParentLoaderPriority(true);
+ contexts.addHandler(lcfCombined);
+ server.start();
+ }
+ else
+ throw new Exception("Can't run singleWar without webapps");
}
else
{
- String crawlerWarPath = "../../framework/build/war-proprietary/mcf-crawler-ui.war";
- String authorityserviceWarPath = "../../framework/build/war-proprietary/mcf-authority-service.war";
- String apiWarPath = "../../framework/build/war-proprietary/mcf-api-service.war";
-
- if (System.getProperty("crawlerWarPath") != null)
- crawlerWarPath = System.getProperty("crawlerWarPath");
- if (System.getProperty("authorityserviceWarPath") != null)
- authorityserviceWarPath = System.getProperty("authorityserviceWarPath");
- if (System.getProperty("apiWarPath") != null)
- apiWarPath = System.getProperty("apiWarPath");
-
- // Initialize the servlets
- WebAppContext lcfCrawlerUI = new WebAppContext(crawlerWarPath,"/mcf-crawler-ui");
- // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
- lcfCrawlerUI.setParentLoaderPriority(true);
- contexts.addHandler(lcfCrawlerUI);
- WebAppContext lcfAuthorityService = new WebAppContext(authorityserviceWarPath,"/mcf-authority-service");
- // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
- lcfAuthorityService.setParentLoaderPriority(true);
- contexts.addHandler(lcfAuthorityService);
- WebAppContext lcfApi = new WebAppContext(apiWarPath,"/mcf-api-service");
- lcfApi.setParentLoaderPriority(true);
- contexts.addHandler(lcfApi);
- server.start();
+ if (webapps)
+ {
+ String crawlerWarPath = "../../framework/build/war-proprietary/mcf-crawler-ui.war";
+ String authorityserviceWarPath = "../../framework/build/war-proprietary/mcf-authority-service.war";
+ String apiWarPath = "../../framework/build/war-proprietary/mcf-api-service.war";
+
+ if (System.getProperty("crawlerWarPath") != null)
+ crawlerWarPath = System.getProperty("crawlerWarPath");
+ if (System.getProperty("authorityserviceWarPath") != null)
+ authorityserviceWarPath = System.getProperty("authorityserviceWarPath");
+ if (System.getProperty("apiWarPath") != null)
+ apiWarPath = System.getProperty("apiWarPath");
+
+ // Initialize the servlets
+ WebAppContext lcfCrawlerUI = new WebAppContext(crawlerWarPath,"/mcf-crawler-ui");
+ // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
+ lcfCrawlerUI.setParentLoaderPriority(true);
+ contexts.addHandler(lcfCrawlerUI);
+ WebAppContext lcfAuthorityService = new WebAppContext(authorityserviceWarPath,"/mcf-authority-service");
+ // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
+ lcfAuthorityService.setParentLoaderPriority(true);
+ contexts.addHandler(lcfAuthorityService);
+ WebAppContext lcfApi = new WebAppContext(apiWarPath,"/mcf-api-service");
+ lcfApi.setParentLoaderPriority(true);
+ contexts.addHandler(lcfApi);
+ server.start();
+ }
// If all worked, then we can start the daemon.
// Clear the agents shutdown signal.