You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/02 12:27:36 UTC
svn commit: r1546965 [1/2] - in /manifoldcf/trunk: ./ framework/
framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/
framework/agents/src/main/java/org/apache/manifoldcf/agents/system/
framework/pull-agent/src/main/java/org/apache/m...
Author: kwright
Date: Mon Dec 2 11:27:35 2013
New Revision: 1546965
URL: http://svn.apache.org/r1546965
Log:
Scheduler changes to support CONNECTORS-781. Introduce a new database table to track bins, and implement minimumDepth using lock manager constructs to work cross-cluster. WARNING!!! Schema change!
Added:
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/bins/
- copied from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/bins/
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/BinManagerFactory.java
- copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/BinManagerFactory.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IBinManager.java
- copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IBinManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IPriorityCalculator.java
- copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IPriorityCalculator.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PriorityCalculator.java
- copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PriorityCalculator.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java
- copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java
manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/NullOutputConnector.java
- copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/NullOutputConnector.java
manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulerHSQLDBTest.java
- copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulerHSQLDBTest.java
manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulerTester.java
- copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulerTester.java
manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulingRepositoryConnector.java
- copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulingRepositoryConnector.java
Modified:
manifoldcf/trunk/ (props changed)
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
manifoldcf/trunk/framework/build.xml
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java
manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java
manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java
manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java
manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java
Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
Merged /manifoldcf/branches/CONNECTORS-781:r1545632-1546964
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java Mon Dec 2 11:27:35 2013
@@ -70,8 +70,9 @@ public interface IAgent
* Call this method to clean up dangling persistent state when a cluster is just starting
* to come up. This method CANNOT be called when there are any active agents
* processes at all.
+ *@param processID is the current process ID.
*/
- public void cleanUpAgentData(IThreadContext threadContext)
+ public void cleanUpAllAgentData(IThreadContext threadContext, String currentProcessID)
throws ManifoldCFException;
/** Cleanup after agents process.
@@ -79,9 +80,10 @@ public interface IAgent
* This method CANNOT be called when the agent is active, but it can
* be called at any time and by any process in order to guarantee that a terminated
* agent does not block other agents from completing their tasks.
- *@param processID is the process ID of the agent to clean up after.
+ *@param currentProcessID is the current process ID.
+ *@param cleanupProcessID is the process ID of the agent to clean up after.
*/
- public void cleanUpAgentData(IThreadContext threadContext, String processID)
+ public void cleanUpAgentData(IThreadContext threadContext, String currentProcessID, String cleanupProcessID)
throws ManifoldCFException;
/** Start the agent. This method should spin up the agent threads, and
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java Mon Dec 2 11:27:35 2013
@@ -166,6 +166,8 @@ public class ManifoldCF extends org.apac
/** Agent service name prefix (followed by agent class name) */
public static final String agentServicePrefix = "AGENT_";
+ protected static AgentsThread agentsThread = null;
+
/** Run agents process.
* This method will not return until a shutdown signal is sent.
*/
@@ -178,24 +180,68 @@ public class ManifoldCF extends org.apac
if (lockManager.checkGlobalFlag(agentShutdownSignal))
return;
+ // Create and start agents thread.
+ startAgents(threadContext, processID);
+
while (true)
{
// Any shutdown signal yet?
if (lockManager.checkGlobalFlag(agentShutdownSignal))
break;
- // Start whatever agents need to be started
- checkAgents(threadContext, processID);
-
try
{
- ManifoldCF.sleep(5000);
+ ManifoldCF.sleep(5000L);
}
catch (InterruptedException e)
{
break;
}
}
+
+ }
+
+ /** Start agents thread.
+ */
+ public static void startAgents(IThreadContext threadContext, String processID)
+ throws ManifoldCFException
+ {
+ // Create and start agents thread.
+ agentsThread = new AgentsThread(processID);
+ agentsThread.start();
+ }
+
+ /** Stop all started agents.
+ */
+ public static void stopAgents(IThreadContext threadContext, String processID)
+ throws ManifoldCFException
+ {
+ // Shut down agents background thread.
+ while (agentsThread != null)
+ {
+ agentsThread.interrupt();
+ if (!agentsThread.isAlive())
+ agentsThread = null;
+ }
+
+ // Shut down running agents services directly.
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ synchronized (runningHash)
+ {
+ // This is supposedly safe; iterator remove is used
+ Iterator<String> iter = runningHash.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String className = iter.next();
+ IAgent agent = runningHash.get(className);
+ // Stop it
+ agent.stopAgent(threadContext);
+ lockManager.endServiceActivity(getAgentsClassServiceType(className), processID);
+ iter.remove();
+ agent.cleanUp(threadContext);
+ }
+ }
+ // Done.
}
protected static String getAgentsClassServiceType(String agentClassName)
@@ -203,22 +249,80 @@ public class ManifoldCF extends org.apac
return agentServicePrefix + agentClassName;
}
+ /** Agents thread. This runs in background until interrupted, at which point
+ * it shuts down. Its responsibilities include cleaning up after dead processes,
+ * as well as starting newly-registered agent processes, and terminating ones that disappear.
+ */
+ protected static class AgentsThread extends Thread
+ {
+ protected final String processID;
+
+ public AgentsThread(String processID)
+ {
+ super();
+ this.processID = processID;
+ setName("Agents thread");
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ IThreadContext threadContext = ThreadContextFactory.make();
+ while (true)
+ {
+ try
+ {
+ if (Thread.currentThread().isInterrupted())
+ throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+ checkAgents(threadContext, processID);
+ ManifoldCF.sleep(5000L);
+ }
+ catch (InterruptedException e)
+ {
+ break;
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ break;
+ Logging.agents.error("Exception tossed: "+e.getMessage(),e);
+ }
+ catch (OutOfMemoryError e)
+ {
+ System.err.println("Agents process ran out of memory - shutting down");
+ e.printStackTrace(System.err);
+ System.exit(-200);
+ }
+ catch (Throwable e)
+ {
+ Logging.agents.fatal("Error tossed: "+e.getMessage(),e);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ // Severe error on initialization
+ System.err.println("Agents process could not start - shutting down");
+ Logging.agents.fatal("AgentThread initialization error tossed: "+e.getMessage(),e);
+ System.exit(-300);
+ }
+ }
+ }
+
/** Start all not-running agents.
*@param threadContext is the thread context.
*/
- public static void checkAgents(IThreadContext threadContext, String processID)
+ protected static void checkAgents(IThreadContext threadContext, String processID)
throws ManifoldCFException
{
ILockManager lockManager = LockManagerFactory.make(threadContext);
// Get agent manager
IAgentManager manager = AgentManagerFactory.make(threadContext);
- ManifoldCFException problem = null;
synchronized (runningHash)
{
- // DO NOT permit this method to do anything if stopAgents() has ever been called for this JVM!
- // (If it has, it means that the JVM is trying to shut down.)
- if (stopAgentsRun)
- return;
String[] classes = manager.getAllAgents();
Set<String> currentAgentClasses = new HashSet<String>();
@@ -235,7 +339,7 @@ public class ManifoldCF extends org.apac
{
// Throw a lock, so that cleanup processes and startup processes don't collide.
String serviceType = getAgentsClassServiceType(className);
- lockManager.registerServiceBeginServiceActivity(serviceType, processID, new CleanupAgent(threadContext, agent));
+ lockManager.registerServiceBeginServiceActivity(serviceType, processID, new CleanupAgent(threadContext, agent, processID));
// There is a potential race condition where the agent has been started but hasn't yet appeared in runningHash.
// But having runningHash be the synchronizer for this activity will prevent any problems.
agent.startAgent(threadContext, processID);
@@ -244,8 +348,9 @@ public class ManifoldCF extends org.apac
}
catch (ManifoldCFException e)
{
- problem = e;
- agent.cleanUp(threadContext);
+ if (e.getErrorCode() != ManifoldCFException.INTERRUPTED)
+ agent.cleanUp(threadContext);
+ throw e;
}
}
currentAgentClasses.add(className);
@@ -260,25 +365,15 @@ public class ManifoldCF extends org.apac
{
// Shut down this one agent.
IAgent agent = runningHash.get(runningAgentClass);
- try
- {
- // Stop it
- agent.stopAgent(threadContext);
- lockManager.endServiceActivity(getAgentsClassServiceType(runningAgentClass), processID);
- runningAgentsIterator.remove();
- agent.cleanUp(threadContext);
- }
- catch (ManifoldCFException e)
- {
- problem = e;
- }
+ // Stop it
+ agent.stopAgent(threadContext);
+ lockManager.endServiceActivity(getAgentsClassServiceType(runningAgentClass), processID);
+ runningAgentsIterator.remove();
+ agent.cleanUp(threadContext);
}
}
}
- if (problem != null)
- throw problem;
-
synchronized (runningHash)
{
// For every class we're supposed to be running, find registered but no-longer-active instances and clean
@@ -286,7 +381,7 @@ public class ManifoldCF extends org.apac
for (String agentsClass : runningHash.keySet())
{
IAgent agent = runningHash.get(agentsClass);
- IServiceCleanup cleanup = new CleanupAgent(threadContext, agent);
+ IServiceCleanup cleanup = new CleanupAgent(threadContext, agent, processID);
String agentsClassServiceType = getAgentsClassServiceType(agentsClass);
while (!lockManager.cleanupInactiveService(agentsClassServiceType, cleanup))
{
@@ -297,39 +392,18 @@ public class ManifoldCF extends org.apac
}
- /** Stop all started agents.
- */
- public static void stopAgents(IThreadContext threadContext, String processID)
- throws ManifoldCFException
- {
- ILockManager lockManager = LockManagerFactory.make(threadContext);
- synchronized (runningHash)
- {
- // This is supposedly safe; iterator remove is used
- Iterator<String> iter = runningHash.keySet().iterator();
- while (iter.hasNext())
- {
- String className = iter.next();
- IAgent agent = runningHash.get(className);
- // Stop it
- agent.stopAgent(threadContext);
- lockManager.endServiceActivity(getAgentsClassServiceType(className), processID);
- iter.remove();
- agent.cleanUp(threadContext);
- }
- }
- // Done.
- }
protected static class CleanupAgent implements IServiceCleanup
{
protected final IAgent agent;
protected final IThreadContext threadContext;
-
- public CleanupAgent(IThreadContext threadContext, IAgent agent)
+ protected final String processID;
+
+ public CleanupAgent(IThreadContext threadContext, IAgent agent, String processID)
{
this.agent = agent;
this.threadContext = threadContext;
+ this.processID = processID;
}
/** Clean up after the specified service. This method will block any startup of the specified
@@ -340,7 +414,7 @@ public class ManifoldCF extends org.apac
public void cleanUpService(String serviceName)
throws ManifoldCFException
{
- agent.cleanUpAgentData(threadContext, serviceName);
+ agent.cleanUpAgentData(threadContext, processID, serviceName);
}
/** Clean up after ALL services of the type on the cluster.
@@ -349,7 +423,7 @@ public class ManifoldCF extends org.apac
public void cleanUpAllServices()
throws ManifoldCFException
{
- agent.cleanUpAgentData(threadContext);
+ agent.cleanUpAllAgentData(threadContext, processID);
}
/** Perform cluster initialization - that is, whatever is needed presuming that the
@@ -360,8 +434,7 @@ public class ManifoldCF extends org.apac
public void clusterInit()
throws ManifoldCFException
{
- // MHL - we really want a separate clusterInit in agents
- agent.cleanUpAgentData(threadContext);
+ agent.clusterInit(threadContext);
}
}
Modified: manifoldcf/trunk/framework/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/build.xml?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/build.xml (original)
+++ manifoldcf/trunk/framework/build.xml Mon Dec 2 11:27:35 2013
@@ -1486,6 +1486,25 @@
</junit>
</target>
+ <target name="run-pull-agent-tests" depends="compile-pull-agent,compile-pull-agent-tests">
+ <mkdir dir="test-output"/>
+ <junit fork="true" maxmemory="128m" dir="test-output" outputtoformatters="true" showoutput="true" haltonfailure="true">
+ <classpath>
+ <path refid="framework-classpath"/>
+ <pathelement location="build/core/classes"/>
+ <pathelement location="build/core-tests/classes"/>
+ <pathelement location="build/agents/classes"/>
+ <pathelement location="build/agents-tests/classes"/>
+ <pathelement location="build/pull-agent/classes"/>
+ <pathelement location="build/pull-agent-tests/classes"/>
+ </classpath>
+ <formatter type="brief" usefile="false"/>
+
+ <test name="org.apache.manifoldcf.crawler.tests.SchedulerHSQLDBTest" todir="test-output"/>
+
+ </junit>
+ </target>
+
<target name="run-script-engine-tests" depends="compile-core,compile-script-engine,compile-script-engine-tests">
<mkdir dir="test-output"/>
<junit fork="true" maxmemory="128m" dir="test-output" outputtoformatters="true" showoutput="true" haltonfailure="true">
@@ -1519,7 +1538,7 @@
</junit>
</target>
- <target name="run-tests" depends="compile-tests,run-core-tests,run-script-engine-tests"/>
+ <target name="run-tests" depends="compile-tests,run-core-tests,run-pull-agent-tests,run-script-engine-tests"/>
<target name="run-tests-derby" depends="compile-tests">
<mkdir dir="test-derby-output"/>
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java Mon Dec 2 11:27:35 2013
@@ -64,6 +64,7 @@ public abstract class BaseRepositoryConn
* This must return a model value as specified above.
*@return the model type value.
*/
+ @Override
public int getConnectorModel()
{
// Return the simplest model - full everything
@@ -73,6 +74,7 @@ public abstract class BaseRepositoryConn
/** Return the list of activities that this connector supports (i.e. writes into the log).
*@return the list.
*/
+ @Override
public String[] getActivitiesList()
{
return new String[0];
@@ -81,6 +83,7 @@ public abstract class BaseRepositoryConn
/** Return the list of relationship types that this connector recognizes.
*@return the list.
*/
+ @Override
public String[] getRelationshipTypes()
{
// The base situation is that there are no relationships.
@@ -97,6 +100,7 @@ public abstract class BaseRepositoryConn
*@return the set of bin names. If an empty array is returned, it is equivalent to there being no request
* rate throttling available for this identifier.
*/
+ @Override
public String[] getBinNames(String documentIdentifier)
{
// Base version has one bin for all documents. Use empty string for this since "*" would make
@@ -111,6 +115,7 @@ public abstract class BaseRepositoryConn
*@param command is the command, which is taken directly from the API request.
*@return true if the resource is found, false if not. In either case, output may be filled in.
*/
+ @Override
public boolean requestInfo(Configuration output, String command)
throws ManifoldCFException
{
@@ -143,6 +148,7 @@ public abstract class BaseRepositoryConn
*@param endTime is the end of the time range to consider, exclusive.
*@param jobMode is an integer describing how the job is being run, whether continuous or once-only.
*/
+ @Override
public void addSeedDocuments(ISeedingActivity activities, DocumentSpecification spec,
long startTime, long endTime, int jobMode)
throws ManifoldCFException, ServiceInterruption
@@ -285,6 +291,7 @@ public abstract class BaseRepositoryConn
* Empty version strings indicate that there is no versioning ability for the corresponding document, and the document
* will always be processed.
*/
+ @Override
public String[] getDocumentVersions(String[] documentIdentifiers, String[] oldVersions, IVersionActivity activities,
DocumentSpecification spec, int jobMode, boolean usesDefaultAuthority)
throws ManifoldCFException, ServiceInterruption
@@ -387,6 +394,7 @@ public abstract class BaseRepositoryConn
*@param documentIdentifiers is the set of document identifiers.
*@param versions is the corresponding set of version identifiers (individual identifiers may be null).
*/
+ @Override
public void releaseDocumentVersions(String[] documentIdentifiers, String[] versions)
throws ManifoldCFException
{
@@ -396,6 +404,7 @@ public abstract class BaseRepositoryConn
/** Get the maximum number of documents to amalgamate together into one batch, for this connector.
*@return the maximum number. 0 indicates "unlimited".
*/
+ @Override
public int getMaxDocumentRequest()
{
// Base implementation does one at a time.
@@ -416,6 +425,7 @@ public abstract class BaseRepositoryConn
* should only find other references, and should not actually call the ingestion methods.
*@param jobMode is an integer describing how the job is being run, whether continuous or once-only.
*/
+ @Override
public void processDocuments(String[] documentIdentifiers, String[] versions, IProcessActivity activities,
DocumentSpecification spec, boolean[] scanOnly, int jobMode)
throws ManifoldCFException, ServiceInterruption
@@ -461,6 +471,7 @@ public abstract class BaseRepositoryConn
*@param ds is the current document specification for this job.
*@param tabsArray is an array of tab names. Add to this array any tab names that are specific to the connector.
*/
+ @Override
public void outputSpecificationHeader(IHTTPOutput out, Locale locale, DocumentSpecification ds, List<String> tabsArray)
throws ManifoldCFException, IOException
{
@@ -502,6 +513,7 @@ public abstract class BaseRepositoryConn
*@param ds is the current document specification for this job.
*@param tabName is the current tab name.
*/
+ @Override
public void outputSpecificationBody(IHTTPOutput out, Locale locale, DocumentSpecification ds, String tabName)
throws ManifoldCFException, IOException
{
@@ -532,6 +544,7 @@ public abstract class BaseRepositoryConn
*@return null if all is well, or a string error message if there is an error that should prevent saving of
* the job (and cause a redirection to an error page).
*/
+ @Override
public String processSpecificationPost(IPostParameters variableContext, Locale locale, DocumentSpecification ds)
throws ManifoldCFException
{
@@ -561,6 +574,7 @@ public abstract class BaseRepositoryConn
*@param locale is the locale the output is preferred to be in.
*@param ds is the current document specification for this job.
*/
+ @Override
public void viewSpecification(IHTTPOutput out, Locale locale, DocumentSpecification ds)
throws ManifoldCFException, IOException
{
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Dec 2 11:27:35 2013
@@ -228,7 +228,7 @@ public interface IJobManager
*@param descriptions are the document descriptions.
*@param priorities are the desired priorities.
*/
- public void writeDocumentPriorities(long currentTime, DocumentDescription[] descriptions, double[] priorities)
+ public void writeDocumentPriorities(long currentTime, DocumentDescription[] descriptions, IPriorityCalculator[] priorities)
throws ManifoldCFException;
// This method supports the "expiration" thread
@@ -410,9 +410,8 @@ public interface IJobManager
* extent that if one is *already* being processed, it will need to be done over again.
*@param documentDescriptions is the set of description objects for the documents that have had their parent carrydown information changed.
*@param docPriorities are the document priorities to assign to the documents, if needed.
- *@return a flag for each document priority, true if it was used, false otherwise.
*/
- public boolean[] carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, double[] docPriorities)
+ public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, IPriorityCalculator[] docPriorities)
throws ManifoldCFException;
/** Requeue a document because of carrydown changes.
@@ -420,9 +419,8 @@ public interface IJobManager
* extent that if it is *already* being processed, it will need to be done over again.
*@param documentDescription is the description object for the document that has had its parent carrydown information changed.
*@param docPriority is the document priority to assign to the document, if needed.
- *@return a flag for the document priority, true if it was used, false otherwise.
*/
- public boolean carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, double docPriority)
+ public void carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, IPriorityCalculator docPriority)
throws ManifoldCFException;
/** Requeue a document for further processing in the future.
@@ -528,12 +526,11 @@ public interface IJobManager
*@param currentTime is the current time in milliseconds since epoch.
*@param documentPriorities are the document priorities corresponding to the document identifiers.
*@param prereqEventNames are the events that must be completed before each document can be processed.
- *@return true if the priority value(s) were used, false otherwise.
*/
- public boolean[] addDocumentsInitial(String processID,
+ public void addDocumentsInitial(String processID,
Long jobID, String[] legalLinkTypes,
String[] docIDHashes, String[] docIDs, boolean overrideSchedule,
- int hopcountMethod, long currentTime, double[] documentPriorities,
+ int hopcountMethod, long currentTime, IPriorityCalculator[] documentPriorities,
String[][] prereqEventNames)
throws ManifoldCFException;
@@ -621,15 +618,14 @@ public interface IJobManager
*@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
*@param priority is the desired document priority for the document.
*@param prereqEventNames are the events that must be completed before the document can be processed.
- *@return true if the priority value was used, false otherwise.
*/
- public boolean addDocument(String processID,
+ public void addDocument(String processID,
Long jobID, String[] legalLinkTypes,
String docIDHash, String docID,
String parentIdentifierHash,
String relationshipType,
int hopcountMethod, String[] dataNames, Object[][] dataValues,
- long currentTime, double priority, String[] prereqEventNames)
+ long currentTime, IPriorityCalculator priority, String[] prereqEventNames)
throws ManifoldCFException;
/** Add documents to the queue in bulk.
@@ -652,15 +648,14 @@ public interface IJobManager
*@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
*@param priorities are the desired document priorities for the documents.
*@param prereqEventNames are the events that must be completed before each document can be processed.
- *@return an array of boolean values indicating whether or not the passed-in priority value was used or not for each doc id (true if used).
*/
- public boolean[] addDocuments(String processID,
+ public void addDocuments(String processID,
Long jobID, String[] legalLinkTypes,
String[] docIDHashes, String[] docIDs,
String parentIdentifierHash,
String relationshipType,
int hopcountMethod, String[][] dataNames, Object[][][] dataValues,
- long currentTime, double[] priorities,
+ long currentTime, IPriorityCalculator[] priorities,
String[][] prereqEventNames)
throws ManifoldCFException;
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java Mon Dec 2 11:27:35 2013
@@ -60,10 +60,6 @@ public class QueueTracker
/** These are the accumulated performance averages for all connections etc. */
protected final PerformanceStatistics performanceStatistics = new PerformanceStatistics();
- /** These are the bin counts for a prioritization pass.
- * This hash table is keyed by bin, and contains DoubleBinCount objects as values */
- protected final Map<String,DoubleBinCount> binCounts = new HashMap<String,DoubleBinCount>();
-
/** These are the bin counts for tracking the documents that are on
* the active queue, but are not being processed yet */
protected final Map<String,BinCount> queuedBinCounts = new HashMap<String,BinCount>();
@@ -71,52 +67,11 @@ public class QueueTracker
/** These are the bin counts for active threads */
protected final Map<String,BinCount> activeBinCounts = new HashMap<String,BinCount>();
- /** The "minimum depth" - which is the smallest bin count of the last document queued. This helps guarantee that documents that are
- * newly discovered don't wind up with high priority, but instead wind up about the same as the currently active document priority. */
- protected double currentMinimumDepth = 0.0;
-
- /** This flag, when set, indicates that a reset is in progress, so queuetracker bincount updates are ignored. */
- protected boolean resetInProgress = false;
-
- /** This hash table is keyed by PriorityKey objects, and contains ArrayList objects containing Doubles, in sorted order. */
- protected final Map<PriorityKey,List<Double>> availablePriorities = new HashMap<PriorityKey,List<Double>>();
-
- /** This hash table is keyed by a String (which is the bin name), and contains a Set of PriorityKey objects containing that
- * String as a bin */
- protected final Map<String,Set<PriorityKey>> binDependencies = new HashMap<String,Set<PriorityKey>>();
-
-
/** Constructor */
public QueueTracker()
{
}
- /** Reset the queue tracker.
- * This occurs ONLY when we are about to reprioritize all active documents. It does not affect the portion of the queue tracker that
- * tracks the active queue.
- */
- public void beginReset()
- {
- synchronized (binCounts)
- {
- binCounts.clear();
- currentMinimumDepth = 0.0;
- availablePriorities.clear();
- binDependencies.clear();
- resetInProgress = true;
- }
-
- }
-
- /** Finish the reset operation */
- public void endReset()
- {
- synchronized (binCounts)
- {
- resetInProgress = false;
- }
- }
-
/** Add an access record to the queue tracker. This happens when a document
* is added to the in-memory queue, and allows us to keep track of that particular event so
* we can schedule in a way that meets our distribution goals.
@@ -153,60 +108,6 @@ public class QueueTracker
}
- /** Note that a priority which was previously allocated was not used, and needs to be released.
- */
- public void notePriorityNotUsed(String[] binNames, IRepositoryConnection connection, double priority)
- {
- // If this is called, it means that a calculated document priority was given out but was not used. As such, this
- // priority can now be assigned to the next comparable document that has similar characteristics.
-
- // Since prioritization calculations are not reversible, these unused values are kept in a queue, and are used preferentially.
- PriorityKey pk = new PriorityKey(binNames);
- synchronized (binCounts)
- {
- List<Double> value = availablePriorities.get(pk);
- if (value == null)
- {
- value = new ArrayList<Double>();
- availablePriorities.put(pk,value);
- }
- // Use bisection lookup to file the current priority so that highest priority is at the end (0.0), and lowest is at the beginning
- int begin = 0;
- int end = value.size();
- while (true)
- {
- if (end == begin)
- {
- value.add(end,new Double(priority));
- break;
- }
- int middle = (begin + end) >> 1;
- Double middleValue = (Double)value.get(middle);
- if (middleValue.doubleValue() < priority)
- {
- end = middle;
- }
- else
- {
- begin = middle + 1;
- }
- }
- // Make sure the key is asserted into the binDependencies map for each bin
- int i = 0;
- while (i < binNames.length)
- {
- String binName = binNames[i++];
- Set<PriorityKey> hm = binDependencies.get(binName);
- if (hm == null)
- {
- hm = new HashSet<PriorityKey>();
- binDependencies.put(binName,hm);
- }
- hm.add(pk);
- }
- }
- }
-
/** Note the time required to successfully complete a set of documents. This allows this module to keep track of
* the performance characteristics of each individual connection, so distribution across connections can be balanced
* properly.
@@ -274,55 +175,6 @@ public class QueueTracker
}
}
- /** Assess the current minimum depth.
- * This method is called to provide to the QueueTracker information about the priorities of the documents being currently
- * queued. It is the case that it is unoptimal to assign document priorities that are fundamentally higher than this value,
- * because then the new documents will be preferentially queued, and the goal of distributing documents across bins will not be
- * adequately met.
- *@param binNamesSet is the current set of priorities we see on the queuing operation.
- */
- public void assessMinimumDepth(Double[] binNamesSet)
- {
- synchronized (binCounts)
- {
- // Ignore all numbers until reset is complete
- if (!resetInProgress)
- {
- //Logging.scheduling.debug("In assessMinimumDepth");
- int j = 0;
- double newMinPriority = Double.MAX_VALUE;
- while (j < binNamesSet.length)
- {
- Double binValue = binNamesSet[j++];
- if (binValue.doubleValue() < newMinPriority)
- newMinPriority = binValue.doubleValue();
- }
-
- if (newMinPriority != Double.MAX_VALUE)
- {
- // Convert minPriority to minDepth.
- // Note that this calculation does not take into account anything having to do with connection rates, throttling,
- // or other adjustment factors. It allows us only to obtain the "raw" minimum depth: the depth without any
- // adjustments.
- double newMinDepth = Math.exp(newMinPriority)-1.0;
-
- if (newMinDepth > currentMinimumDepth)
- {
- currentMinimumDepth = newMinDepth;
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Setting new minimum depth value to "+new Double(currentMinimumDepth).toString());
- }
- else
- {
- if (newMinDepth < currentMinimumDepth && Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Minimum depth value seems to have been set too high too early! currentMin = "+new Double(currentMinimumDepth).toString()+"; queue value = "+new Double(newMinDepth).toString());
- }
- }
- }
- }
-
- }
-
/** Note that we have completed processing of a document with a given set of bins.
* This method gets called when a Worker Thread has finished with a document.
@@ -406,344 +258,6 @@ public class QueueTracker
return rval;
}
- /** This is a made-up constant, originally based on 100 documents/second, but adjusted downward as a result of experimentation and testing, which is described as "T" below.
- */
- private final static double minMsPerFetch = 50.0;
-
- /** Calculate a document priority value. Priorities are reversed, and in log space, so that
- * zero (0.0) is considered the highest possible priority, and larger priority values are considered lower in actual priority.
- *@param binNames are the global bins to which the document belongs.
- *@param connection is the connection, from which the throttles may be obtained. More highly throttled connections are given
- * less favorable priority.
- *@return the priority value, based on recent history. Also updates statistics atomically.
- */
- public double calculatePriority(String[] binNames, IRepositoryConnection connection)
- {
- synchronized (binCounts)
- {
-
- // NOTE: We must be sure to adjust the return value by the factor calculated due to performance; a slower throttle rate
- // should yield a lower priority. In theory it should be possible to calculate an adjusted priority pretty exactly,
- // on the basis that the fetch rates of two distinct bins should grant priorities such that:
- //
- // (n documents) / (the rate of fetch (docs/millisecond) of the first bin) = milliseconds for the first bin
- //
- // should equal:
- //
- // (m documents) / (the rate of fetch of the second bin) = milliseconds for the second bin
- //
- // ... and then assigning priorities so that after a given number of document priorities are assigned from the first bin, the
- // corresponding (*m/n) number of document priorities would get assigned for the second bin.
- //
- // Suppose the maximum fetch rate for the document is F fetches per millisecond. If the document priority assigned for the Bth
- // bin member is -log(1/(1+B)) for a document fetched with no throttling whatsoever,
- // then we want the priority to be -log(1/(1+k)) for a throttled bin, where k is chosen so that:
- // k = B * ((T + 1/F)/T) = B * (1 + 1/TF)
- // ... where T is the time taken to fetch a single document that has no throttling at all.
- // For the purposes of this exercise, a value of 100 doc/sec, or T=10ms.
- //
- // Basically, for F = 0, k should be infinity, and for F = infinity, k should be B.
-
-
- // First, calculate the document's max fetch rate, in fetches per millisecond. This will be used to adjust the priority, and
- // also when resetting the bin counts.
- double[] maxFetchRates = calculateMaxFetchRates(binNames,connection);
-
- // For each bin, we will be calculating the bin count scale factor, which is what we multiply the bincount by to adjust for the
- // throttling on that bin.
- double[] binCountScaleFactors = new double[binNames.length];
-
-
- // Before calculating priority, reset any bins to a higher value, if it seems like it is appropriate. This is how we avoid assigning priorities
- // higher than the current level at which queuing is currently taking place.
-
- // First thing to do is to reset the bin values based on the current minimum. If we *do* wind up resetting, we also need to ditch any availablePriorities that match.
- int i = 0;
- while (i < binNames.length)
- {
- String binName = binNames[i];
- // Remember, maxFetchRate is in fetches per ms.
- double maxFetchRate = maxFetchRates[i];
-
- // Calculate (and save for later) the scale factor for this bin.
- double binCountScaleFactor;
- if (maxFetchRate == 0.0)
- binCountScaleFactor = Double.POSITIVE_INFINITY;
- else
- binCountScaleFactor = 1.0 + 1.0 / (minMsPerFetch * maxFetchRate);
- binCountScaleFactors[i] = binCountScaleFactor;
-
- double thisCount = 0.0;
- DoubleBinCount bc = binCounts.get(binName);
- if (bc != null)
- {
- thisCount = bc.getValue();
- }
- // Adjust the count, if needed, so that we are not assigning priorities greater than the current level we are
- // grabbing documents at
- if (thisCount * binCountScaleFactor < currentMinimumDepth)
- {
- double weightedMinimumDepth = currentMinimumDepth / binCountScaleFactor;
-
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Resetting value of bin '"+binName+"' to "+new Double(weightedMinimumDepth).toString()+"(scale factor is "+new Double(binCountScaleFactor)+")");
-
- // Clear available priorities that depend on this bin
- Set<PriorityKey> hm = binDependencies.get(binName);
- if (hm != null)
- {
- for (PriorityKey pk : hm)
- {
- availablePriorities.remove(pk);
- }
- binDependencies.remove(binName);
- }
-
- // Set a new bin value
- if (bc == null)
- {
- bc = new DoubleBinCount();
- binCounts.put(binName,bc);
- }
- bc.setValue(weightedMinimumDepth);
- }
-
- i++;
- }
-
- double returnValue;
-
- PriorityKey pk2 = new PriorityKey(binNames);
- List<Double> queuedvalue = availablePriorities.get(pk2);
- if (queuedvalue != null && queuedvalue.size() > 0)
- {
- // There's a saved value on the queue, which was calculated but not assigned earlier. We use these values preferentially.
- returnValue = ((Double)queuedvalue.remove(queuedvalue.size()-1)).doubleValue();
- if (queuedvalue.size() == 0)
- {
- i = 0;
- while (i < binNames.length)
- {
- String binName = binNames[i++];
- Set<PriorityKey> hm = binDependencies.get(binName);
- if (hm != null)
- {
- hm.remove(pk2);
- if (hm.size() == 0)
- binDependencies.remove(binName);
- }
- }
- availablePriorities.remove(pk2);
- }
- }
- else
- {
- // There was no previously-calculated value available, so we need to calculate a new value.
-
- // Find the bin with the largest effective count, and use that for the document's priority.
- // (This of course assumes that the slowest throttle is the one that wins.)
- double highestAdjustedCount = 0.0;
- i = 0;
- while (i < binNames.length)
- {
- String binName = binNames[i];
- double binCountScaleFactor = binCountScaleFactors[i];
-
- double thisCount = 0.0;
- DoubleBinCount bc = binCounts.get(binName);
- if (bc != null)
- thisCount = bc.getValue();
-
- double adjustedCount;
- // Use the scale factor already calculated above to yield a priority that is adjusted for the fetch rate.
- if (binCountScaleFactor == Double.POSITIVE_INFINITY)
- adjustedCount = Double.POSITIVE_INFINITY;
- else
- adjustedCount = thisCount * binCountScaleFactor;
- if (adjustedCount > highestAdjustedCount)
- highestAdjustedCount = adjustedCount;
- i++;
- }
-
- // Calculate the proper log value
- if (highestAdjustedCount == Double.POSITIVE_INFINITY)
- returnValue = Double.POSITIVE_INFINITY;
- else
- returnValue = Math.log(1.0 + highestAdjustedCount);
-
- // Update bins to indicate we used another priority. If more than one bin is associated with the document,
- // counts for all bins are nevertheless updated, because we don't wish to arrange scheduling collisions with hypothetical
- // documents that share any of these bins.
- int j = 0;
- while (j < binNames.length)
- {
- String binName = binNames[j];
- DoubleBinCount bc = binCounts.get(binName);
- if (bc == null)
- {
- bc = new DoubleBinCount();
- binCounts.put(binName,bc);
- }
- bc.increment();
-
- j++;
- }
-
- }
-
- if (Logging.scheduling.isDebugEnabled())
- {
- StringBuilder sb = new StringBuilder();
- int k = 0;
- while (k < binNames.length)
- {
- sb.append(binNames[k++]).append(" ");
- }
- Logging.scheduling.debug("Document with bins ["+sb.toString()+"] given priority value "+new Double(returnValue).toString());
- }
-
-
- return returnValue;
- }
- }
-
- /** Calculate the maximum fetch rate for a given set of bins for a given connection.
- * This is used to adjust the final priority of a document.
- */
- protected double[] calculateMaxFetchRates(String[] binNames, IRepositoryConnection connection)
- {
- ThrottleLimits tl = new ThrottleLimits(connection);
- return tl.getMaximumRates(binNames);
- }
-
- /** This class represents the throttle limits out of the connection specification */
- protected static class ThrottleLimits
- {
- protected ArrayList specs = new ArrayList();
-
- public ThrottleLimits(IRepositoryConnection connection)
- {
- String[] throttles = connection.getThrottles();
- int i = 0;
- while (i < throttles.length)
- {
- try
- {
- specs.add(new ThrottleLimitSpec(throttles[i],(double)connection.getThrottleValue(throttles[i])));
- }
- catch (PatternSyntaxException e)
- {
- }
- i++;
- }
- }
-
- public double[] getMaximumRates(String[] binNames)
- {
- double[] rval = new double[binNames.length];
- int j = 0;
- while (j < binNames.length)
- {
- String binName = binNames[j];
- double maxRate = Double.POSITIVE_INFINITY;
- int i = 0;
- while (i < specs.size())
- {
- ThrottleLimitSpec spec = (ThrottleLimitSpec)specs.get(i++);
- Pattern p = spec.getRegexp();
- Matcher m = p.matcher(binName);
- if (m.find())
- {
- double rate = spec.getMaxRate();
- // The direction of this inequality reflects the fact that the throttling is conservative when more rules are present.
- if (rate < maxRate)
- maxRate = rate;
- }
- }
- rval[j] = maxRate;
- j++;
- }
- return rval;
- }
-
- }
-
- /** This is a class which describes an individual throttle limit, in fetches per millisecond. */
- protected static class ThrottleLimitSpec
- {
- /** Regexp */
- protected Pattern regexp;
- /** The fetch limit for all bins matching that regexp, in fetches per millisecond */
- protected double maxRate;
-
- /** Constructor */
- public ThrottleLimitSpec(String regexp, double maxRate)
- throws PatternSyntaxException
- {
- this.regexp = Pattern.compile(regexp);
- this.maxRate = maxRate;
- }
-
- /** Get the regexp. */
- public Pattern getRegexp()
- {
- return regexp;
- }
-
- /** Get the max count */
- public double getMaxRate()
- {
- return maxRate;
- }
- }
-
- /** This is the key class for the availablePriorities table */
- protected static class PriorityKey
- {
- // The bins, in sorted order
- protected String[] binNames;
-
- /** Constructor */
- public PriorityKey(String[] binNames)
- {
- this.binNames = new String[binNames.length];
- int i = 0;
- while (i < binNames.length)
- {
- this.binNames[i] = binNames[i];
- i++;
- }
- java.util.Arrays.sort(this.binNames);
- }
-
- public int hashCode()
- {
- int rval = 0;
- int i = 0;
- while (i < binNames.length)
- {
- rval += binNames[i++].hashCode();
- }
- return rval;
- }
-
- public boolean equals(Object o)
- {
- if (!(o instanceof PriorityKey))
- return false;
- PriorityKey p = (PriorityKey)o;
- if (binNames.length != p.binNames.length)
- return false;
- int i = 0;
- while (i < binNames.length)
- {
- if (!binNames[i].equals(p.binNames[i]))
- return false;
- i++;
- }
- return true;
- }
- }
/** This is the class which allows a mutable integer count value to be saved in the bincount table.
*/
@@ -790,42 +304,5 @@ public class QueueTracker
}
}
- /** This is the class which allows a mutable integer count value to be saved in the bincount table.
- */
- protected static class DoubleBinCount
- {
- /** The count */
- protected double count = 0.0;
-
- /** Create */
- public DoubleBinCount()
- {
- }
-
- public DoubleBinCount duplicate()
- {
- DoubleBinCount rval = new DoubleBinCount();
- rval.count = this.count;
- return rval;
- }
-
- /** Increment the counter */
- public void increment()
- {
- count += 1.0;
- }
-
- /** Set the value */
- public void setValue(double count)
- {
- this.count = count;
- }
-
- /** Get the value */
- public double getValue()
- {
- return count;
- }
- }
}
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Dec 2 11:27:35 2013
@@ -39,16 +39,16 @@ public class JobManager implements IJobM
protected static final String hopLock = "_HOPLOCK_";
// Member variables
- protected IDBInterface database;
- protected IOutputConnectionManager outputMgr;
- protected IRepositoryConnectionManager connectionMgr;
- protected ILockManager lockManager;
- protected IThreadContext threadContext;
- protected JobQueue jobQueue;
- protected Jobs jobs;
- protected HopCount hopCount;
- protected Carrydown carryDown;
- protected EventManager eventManager;
+ protected final IDBInterface database;
+ protected final IOutputConnectionManager outputMgr;
+ protected final IRepositoryConnectionManager connectionMgr;
+ protected final ILockManager lockManager;
+ protected final IThreadContext threadContext;
+ protected final JobQueue jobQueue;
+ protected final Jobs jobs;
+ protected final HopCount hopCount;
+ protected final Carrydown carryDown;
+ protected final EventManager eventManager;
protected static Random random = new Random();
@@ -70,7 +70,6 @@ public class JobManager implements IJobM
outputMgr = OutputConnectionManagerFactory.make(threadContext);
connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
lockManager = LockManagerFactory.make(threadContext);
-
}
/** Install.
@@ -1933,7 +1932,7 @@ public class JobManager implements IJobM
*@param documentDescriptions are the document descriptions.
*@param priorities are the desired priorities.
*/
- public void writeDocumentPriorities(long currentTime, DocumentDescription[] documentDescriptions, double[] priorities)
+ public void writeDocumentPriorities(long currentTime, DocumentDescription[] documentDescriptions, IPriorityCalculator[] priorities)
throws ManifoldCFException
{
@@ -1972,10 +1971,8 @@ public class JobManager implements IJobM
throw new ManifoldCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
int index = x.intValue();
DocumentDescription dd = documentDescriptions[index];
- double priority = priorities[index];
- jobQueue.writeDocPriority(currentTime,dd.getID(),priorities[index]);
- if (Logging.perf.isDebugEnabled())
- Logging.perf.debug("Setting document priority for '"+dd.getDocumentIdentifier()+"' to "+new Double(priority).toString()+", set time "+new Long(currentTime).toString());
+ IPriorityCalculator priority = priorities[index];
+ jobQueue.writeDocPriority(currentTime,dd.getID(),priority);
i++;
}
database.performCommit();
@@ -3860,27 +3857,25 @@ public class JobManager implements IJobM
*@param currentTime is the current time in milliseconds since epoch.
*@param documentPriorities are the document priorities corresponding to the document identifiers.
*@param prereqEventNames are the events that must be completed before each document can be processed.
- *@return true if the priority value(s) were used, false otherwise.
*/
@Override
- public boolean[] addDocumentsInitial(String processID, Long jobID, String[] legalLinkTypes,
+ public void addDocumentsInitial(String processID, Long jobID, String[] legalLinkTypes,
String[] docIDHashes, String[] docIDs, boolean overrideSchedule,
- int hopcountMethod, long currentTime, double[] documentPriorities,
+ int hopcountMethod, long currentTime, IPriorityCalculator[] documentPriorities,
String[][] prereqEventNames)
throws ManifoldCFException
{
if (docIDHashes.length == 0)
- return new boolean[0];
+ return;
// The document identifiers need to be sorted in a consistent fashion to reduce deadlock, and have duplicates removed, before going ahead.
// But, the documentPriorities and the return booleans need to correspond to the initial array. So, after we come up with
// our internal order, we need to construct a map that takes an original index and maps it to the reduced, reordered index.
String[] reorderedDocIDHashes = eliminateDuplicates(docIDHashes);
HashMap reorderMap = buildReorderMap(docIDHashes,reorderedDocIDHashes);
- double[] reorderedDocumentPriorities = new double[reorderedDocIDHashes.length];
+ IPriorityCalculator[] reorderedDocumentPriorities = new IPriorityCalculator[reorderedDocIDHashes.length];
String[][] reorderedDocumentPrerequisites = new String[reorderedDocIDHashes.length][];
String[] reorderedDocumentIdentifiers = new String[reorderedDocIDHashes.length];
- boolean[] rval = new boolean[docIDHashes.length];
int i = 0;
while (i < docIDHashes.length)
{
@@ -3894,7 +3889,6 @@ public class JobManager implements IJobM
reorderedDocumentPrerequisites[newPosition.intValue()] = null;
reorderedDocumentIdentifiers[newPosition.intValue()] = docIDs[i];
}
- rval[i] = false;
i++;
}
@@ -3919,12 +3913,11 @@ public class JobManager implements IJobM
" initial docs and hopcounts for job "+jobID.toString());
// Go through document id's one at a time, in order - mainly to prevent deadlock as much as possible. Search for any existing row in jobqueue first (for update)
- boolean[] reorderedRval = new boolean[reorderedDocIDHashes.length];
int z = 0;
while (z < reorderedDocIDHashes.length)
{
String docIDHash = reorderedDocIDHashes[z];
- double docPriority = reorderedDocumentPriorities[z];
+ IPriorityCalculator docPriority = reorderedDocumentPriorities[z];
String docID = reorderedDocumentIdentifiers[z];
String[] docPrereqs = reorderedDocumentPrerequisites[z];
@@ -3943,7 +3936,6 @@ public class JobManager implements IJobM
IResultSet set = database.performQuery(sb.toString(),list,null,null);
- boolean priorityUsed;
long executeTime = overrideSchedule?0L:-1L;
if (set.getRowCount() > 0)
@@ -3956,16 +3948,15 @@ public class JobManager implements IJobM
int status = jobQueue.stringToStatus((String)row.getValue(jobQueue.statusField));
Long checkTimeValue = (Long)row.getValue(jobQueue.checkTimeField);
- priorityUsed = jobQueue.updateExistingRecordInitial(rowID,status,checkTimeValue,executeTime,currentTime,docPriority,docPrereqs,processID);
+ jobQueue.updateExistingRecordInitial(rowID,status,checkTimeValue,executeTime,currentTime,docPriority,docPrereqs,processID);
}
else
{
// Not found. Attempt an insert instead. This may fail due to constraints, but if this happens, the whole transaction will be retried.
jobQueue.insertNewRecordInitial(jobID,docIDHash,docID,docPriority,executeTime,currentTime,docPrereqs,processID);
- priorityUsed = true;
}
- reorderedRval[z++] = priorityUsed;
+ z++;
}
if (Logging.perf.isDebugEnabled())
@@ -3983,17 +3974,7 @@ public class JobManager implements IJobM
Logging.perf.debug("Took "+new Long(System.currentTimeMillis()-startTime).toString()+" ms to add "+Integer.toString(reorderedDocIDHashes.length)+
" initial docs and hopcounts for job "+jobID.toString());
- // Rejigger to correspond with calling order
- i = 0;
- while (i < docIDs.length)
- {
- Integer finalPosition = (Integer)reorderMap.get(new Integer(i));
- if (finalPosition != null)
- rval[i] = reorderedRval[finalPosition.intValue()];
- i++;
- }
-
- return rval;
+ return;
}
catch (ManifoldCFException e)
{
@@ -4388,20 +4369,19 @@ public class JobManager implements IJobM
*@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
*@param documentPriorities are the desired document priorities for the documents.
*@param prereqEventNames are the events that must be completed before a document can be queued.
- *@return an array of boolean values indicating whether or not the passed-in priority value was used or not for each doc id (true if used).
*/
@Override
- public boolean[] addDocuments(String processID,
+ public void addDocuments(String processID,
Long jobID, String[] legalLinkTypes,
String[] docIDHashes, String[] docIDs,
String parentIdentifierHash, String relationshipType,
int hopcountMethod, String[][] dataNames, Object[][][] dataValues,
- long currentTime, double[] documentPriorities,
+ long currentTime, IPriorityCalculator[] documentPriorities,
String[][] prereqEventNames)
throws ManifoldCFException
{
if (docIDs.length == 0)
- return new boolean[0];
+ return;
// Sort the id hashes and eliminate duplicates. This will help avoid deadlock conditions.
// However, we also need to keep the carrydown data in synch, so track that around as well, and merge if there are
@@ -4458,7 +4438,7 @@ public class JobManager implements IJobM
String[] reorderedDocIDHashes = eliminateDuplicates(docIDHashes);
HashMap reorderMap = buildReorderMap(docIDHashes,reorderedDocIDHashes);
- double[] reorderedDocumentPriorities = new double[reorderedDocIDHashes.length];
+ IPriorityCalculator[] reorderedDocumentPriorities = new IPriorityCalculator[reorderedDocIDHashes.length];
String[][] reorderedDocumentPrerequisites = new String[reorderedDocIDHashes.length][];
String[] reorderedDocumentIdentifiers = new String[reorderedDocIDHashes.length];
boolean[] rval = new boolean[docIDHashes.length];
@@ -4556,8 +4536,6 @@ public class JobManager implements IJobM
IResultSet set = database.performQuery(sb.toString(),list,null,null);
- boolean priorityUsed;
-
if (set.getRowCount() > 0)
{
// Found a row, and it is now locked.
@@ -4586,25 +4564,19 @@ public class JobManager implements IJobM
if (parentIdentifierHash != null && relationshipType != null)
hopcountChangesSeen = hopCount.recordReferences(jobID,legalLinkTypes,parentIdentifierHash,reorderedDocIDHashes,relationshipType,hopcountMethod,processID);
- // Loop through the document id's again, and perform updates where needed
- boolean[] reorderedRval = new boolean[reorderedDocIDHashes.length];
-
boolean reactivateRemovedHopcountRecords = false;
for (int z = 0; z < reorderedDocIDHashes.length; z++)
{
String docIDHash = reorderedDocIDHashes[z];
JobqueueRecord jr = (JobqueueRecord)existingRows.get(docIDHash);
- if (jr == null)
- // It was an insert
- reorderedRval[z] = true;
- else
+ if (jr != null)
{
// It was an existing row; do the update logic
// The hopcountChangesSeen array describes whether each reference is a new one. This
// helps us determine whether we're going to need to "flip" HOPCOUNTREMOVED documents
// to the PENDING state. If the new link ended in an existing record, THEN we need to flip them all!
- reorderedRval[z] = jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
+ jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
0L,currentTime,carrydownChangesSeen[z] || (hopcountChangesSeen!=null && hopcountChangesSeen[z]),
reorderedDocumentPriorities[z],reorderedDocumentPrerequisites[z]);
// Signal if we need to perform the flip
@@ -4624,16 +4596,7 @@ public class JobManager implements IJobM
Logging.perf.debug("Took "+new Long(System.currentTimeMillis()-startTime).toString()+" ms to add "+Integer.toString(reorderedDocIDHashes.length)+
" docs and hopcounts for job "+jobID.toString()+" parent identifier hash "+parentIdentifierHash);
- i = 0;
- while (i < docIDHashes.length)
- {
- Integer finalPosition = (Integer)reorderMap.get(new Integer(i));
- if (finalPosition != null)
- rval[i] = reorderedRval[finalPosition.intValue()];
- i++;
- }
-
- return rval;
+ return;
}
catch (ManifoldCFException e)
{
@@ -4688,20 +4651,19 @@ public class JobManager implements IJobM
*@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
*@param priority is the desired document priority for the document.
*@param prereqEventNames are the events that must be completed before the document can be processed.
- *@return true if the priority value was used, false otherwise.
*/
@Override
- public boolean addDocument(String processID,
+ public void addDocument(String processID,
Long jobID, String[] legalLinkTypes, String docIDHash, String docID,
String parentIdentifierHash, String relationshipType,
int hopcountMethod, String[] dataNames, Object[][] dataValues,
- long currentTime, double priority, String[] prereqEventNames)
+ long currentTime, IPriorityCalculator priority, String[] prereqEventNames)
throws ManifoldCFException
{
- return addDocuments(processID,jobID,legalLinkTypes,
+ addDocuments(processID,jobID,legalLinkTypes,
new String[]{docIDHash},new String[]{docID},
parentIdentifierHash,relationshipType,hopcountMethod,new String[][]{dataNames},
- new Object[][][]{dataValues},currentTime,new double[]{priority},new String[][]{prereqEventNames})[0];
+ new Object[][][]{dataValues},currentTime,new IPriorityCalculator[]{priority},new String[][]{prereqEventNames});
}
/** Complete adding child documents to the queue, for a set of documents.
@@ -4713,6 +4675,7 @@ public class JobManager implements IJobM
*@return the set of documents for which carrydown data was changed by this operation. These documents are likely
* to be requeued as a result of the change.
*/
+ @Override
public DocumentDescription[] finishDocuments(Long jobID, String[] legalLinkTypes, String[] parentIdentifierHashes, int hopcountMethod)
throws ManifoldCFException
{
@@ -4948,6 +4911,7 @@ public class JobManager implements IJobM
/** Complete an event sequence.
*@param eventName is the name of the event.
*/
+ @Override
public void completeEventSequence(String eventName)
throws ManifoldCFException
{
@@ -4960,13 +4924,13 @@ public class JobManager implements IJobM
* extent that if one is *already* being processed, it will need to be done over again.
*@param documentDescriptions is the set of description objects for the documents that have had their parent carrydown information changed.
*@param docPriorities are the document priorities to assign to the documents, if needed.
- *@return a flag for each document priority, true if it was used, false otherwise.
*/
- public boolean[] carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, double[] docPriorities)
+ @Override
+ public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, IPriorityCalculator[] docPriorities)
throws ManifoldCFException
{
if (documentDescriptions.length == 0)
- return new boolean[0];
+ return;
// Order the updates by document hash, to prevent deadlock as much as possible.
@@ -4985,8 +4949,6 @@ public class JobManager implements IJobM
// Sort the hashes
java.util.Arrays.sort(docIDHashes);
- boolean[] rval = new boolean[docIDHashes.length];
-
// Enter transaction and prepare to look up document states in dochash order
while (true)
{
@@ -5041,13 +5003,10 @@ public class JobManager implements IJobM
int originalIndex = ((Integer)docHashMap.get(docIDHash)).intValue();
JobqueueRecord jr = (JobqueueRecord)existingRows.get(docIDHash);
- if (jr == null)
- // It wasn't found, so the doc priority wasn't used.
- rval[originalIndex] = false;
- else
+ if (jr != null)
// It was an existing row; do the update logic; use the 'carrydown changes' flag = true all the time.
- rval[originalIndex] = jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
- 0L,currentTime,true,docPriorities[originalIndex],null);
+ jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
+ 0L,currentTime,true,docPriorities[originalIndex],null);
j++;
}
database.performCommit();
@@ -5076,7 +5035,6 @@ public class JobManager implements IJobM
sleepFor(sleepAmt);
}
}
- return rval;
}
/** Requeue a document because of carrydown changes.
@@ -5084,12 +5042,12 @@ public class JobManager implements IJobM
* extent that if it is *already* being processed, it will need to be done over again.
*@param documentDescription is the description object for the document that has had its parent carrydown information changed.
*@param docPriority is the document priority to assign to the document, if needed.
- *@return a flag for the document priority, true if it was used, false otherwise.
*/
- public boolean carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, double docPriority)
+ @Override
+ public void carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, IPriorityCalculator docPriority)
throws ManifoldCFException
{
- return carrydownChangeDocumentMultiple(new DocumentDescription[]{documentDescription},currentTime,new double[]{docPriority})[0];
+ carrydownChangeDocumentMultiple(new DocumentDescription[]{documentDescription},currentTime,new IPriorityCalculator[]{docPriority});
}
/** Sleep a random amount of time after a transaction abort.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Dec 2 11:27:35 2013
@@ -858,12 +858,12 @@ public class JobQueue extends org.apache
}
/** Write out a document priority */
- public void writeDocPriority(long currentTime, Long rowID, double priority)
+ public void writeDocPriority(long currentTime, Long rowID, IPriorityCalculator priority)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(prioritySetField,new Long(currentTime));
- map.put(docPriorityField,new Double(priority));
+ map.put(docPriorityField,new Double(priority.getDocumentPriority()));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,rowID)});
@@ -1177,19 +1177,18 @@ public class JobQueue extends org.apache
/** Update an existing record (as the result of an initial add).
* The record is presumed to exist and have been locked, via "FOR UPDATE".
*/
- public boolean updateExistingRecordInitial(Long recordID, int currentStatus, Long checkTimeValue,
- long desiredExecuteTime, long currentTime, double desiredPriority, String[] prereqEvents,
+ public void updateExistingRecordInitial(Long recordID, int currentStatus, Long checkTimeValue,
+ long desiredExecuteTime, long currentTime, IPriorityCalculator desiredPriority, String[] prereqEvents,
String processID)
throws ManifoldCFException
{
// The general rule here is:
// If doesn't exist, make a PENDING entry.
- // If PENDING, keep it as PENDING.
+ // If PENDING, keep it as PENDING.
// If COMPLETE, make a PENDING entry.
// If PURGATORY, make a PENDINGPURGATORY entry.
// Leave everything else alone and do nothing.
- boolean rval = false;
HashMap map = new HashMap();
switch (currentStatus)
{
@@ -1217,9 +1216,8 @@ public class JobQueue extends org.apache
map.put(failTimeField,null);
map.put(failCountField,null);
// Update the doc priority.
- map.put(docPriorityField,new Double(desiredPriority));
+ map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
map.put(prioritySetField,new Long(currentTime));
- rval = true;
break;
case STATUS_PENDING:
@@ -1267,7 +1265,6 @@ public class JobQueue extends org.apache
// Insert prereqevent entries, if any
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(0,1,0);
- return rval;
}
/** Insert a new record into the jobqueue table (as part of adding an initial reference).
@@ -1276,7 +1273,7 @@ public class JobQueue extends org.apache
*@param docHash is the hash of the local document identifier.
*@param docID is the local document identifier.
*/
- public void insertNewRecordInitial(Long jobID, String docHash, String docID, double desiredDocPriority,
+ public void insertNewRecordInitial(Long jobID, String docHash, String docID, IPriorityCalculator desiredDocPriority,
long desiredExecuteTime, long currentTime, String[] prereqEvents, String processID)
throws ManifoldCFException
{
@@ -1296,7 +1293,7 @@ public class JobQueue extends org.apache
map.put(isSeedField,seedstatusToString(SEEDSTATUS_NEWSEED));
map.put(seedingProcessIDField,processID);
// Set the document priority
- map.put(docPriorityField,new Double(desiredDocPriority));
+ map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
map.put(prioritySetField,new Long(currentTime));
performInsert(map,null);
prereqEventManager.addRows(recordID,prereqEvents);
@@ -1476,14 +1473,12 @@ public class JobQueue extends org.apache
/** Update an existing record (as the result of a reference add).
* The record is presumed to exist and have been locked, via "FOR UPDATE".
- *@return true if the document priority slot has been retained, false if freed.
*/
- public boolean updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
+ public void updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
long desiredExecuteTime, long currentTime, boolean otherChangesSeen,
- double desiredPriority, String[] prereqEvents)
+ IPriorityCalculator desiredPriority, String[] prereqEvents)
throws ManifoldCFException
{
- boolean rval = false;
HashMap map = new HashMap();
switch (currentStatus)
{
@@ -1497,9 +1492,8 @@ public class JobQueue extends org.apache
map.put(failTimeField,null);
map.put(failCountField,null);
// Going into pending: set the docpriority.
- map.put(docPriorityField,new Double(desiredPriority));
+ map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
map.put(prioritySetField,new Long(currentTime));
- rval = true;
break;
case STATUS_COMPLETE:
case STATUS_BEINGCLEANED:
@@ -1515,17 +1509,16 @@ public class JobQueue extends org.apache
map.put(failTimeField,null);
map.put(failCountField,null);
// Going into pending: set the docpriority.
- map.put(docPriorityField,new Double(desiredPriority));
+ map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
map.put(prioritySetField,new Long(currentTime));
- rval = true;
break;
}
- return rval;
+ return;
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
// Document is in the queue, but already needs a rescan for prior reasons.
// We're done.
- return rval;
+ return;
case STATUS_ACTIVE:
// Document is in the queue.
// The problem here is that we have no idea when the document is actually being worked on; we only find out when the document is actually *done*.
@@ -1545,12 +1538,11 @@ public class JobQueue extends org.apache
map.put(failTimeField,null);
map.put(failCountField,null);
// Going into pending: set the docpriority.
- map.put(docPriorityField,new Double(desiredPriority));
+ map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
map.put(prioritySetField,new Long(currentTime));
- rval = true;
break;
}
- return rval;
+ return;
case STATUS_ACTIVEPURGATORY:
// Document is in the queue.
// The problem here is that we have no idea when the document is actually being worked on; we only find out when the document is actually *done*.
@@ -1570,12 +1562,11 @@ public class JobQueue extends org.apache
map.put(failTimeField,null);
map.put(failCountField,null);
// Going into pending: set the docpriority.
- map.put(docPriorityField,new Double(desiredPriority));
+ map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
map.put(prioritySetField,new Long(currentTime));
- rval = true;
break;
}
- return rval;
+ return;
case STATUS_PENDING:
// Document is already waiting to be processed.
// Bump up the schedule, if called for. Otherwise, just leave it alone.
@@ -1584,7 +1575,7 @@ public class JobQueue extends org.apache
{
long currentExecuteTime = cv.longValue();
if (currentExecuteTime <= desiredExecuteTime)
- return rval;
+ return;
}
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -1598,7 +1589,7 @@ public class JobQueue extends org.apache
// Also, leave doc priority alone
// Fall through...
default:
- return rval;
+ return;
}
prereqEventManager.deleteRows(recordID);
ArrayList list = new ArrayList();
@@ -1607,13 +1598,13 @@ public class JobQueue extends org.apache
performUpdate(map,"WHERE "+query,list,null);
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(0,1,0);
- return rval;
+ return;
}
/** Insert a new record into the jobqueue table (as part of adding a child reference).
*
*/
- public void insertNewRecord(Long jobID, String docIDHash, String docID, double desiredDocPriority, long desiredExecuteTime,
+ public void insertNewRecord(Long jobID, String docIDHash, String docID, IPriorityCalculator desiredDocPriority, long desiredExecuteTime,
long currentTime, String[] prereqEvents)
throws ManifoldCFException
{
@@ -1627,7 +1618,7 @@ public class JobQueue extends org.apache
map.put(docIDField,docID);
map.put(statusField,statusToString(STATUS_PENDING));
// Be sure to set the priority also
- map.put(docPriorityField,new Double(desiredDocPriority));
+ map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
map.put(prioritySetField,new Long(currentTime));
performInsert(map,null);
prereqEventManager.addRows(recordID,prereqEvents);