You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/02 12:27:36 UTC

svn commit: r1546965 [1/2] - in /manifoldcf/trunk: ./ framework/ framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ framework/pull-agent/src/main/java/org/apache/m...

Author: kwright
Date: Mon Dec  2 11:27:35 2013
New Revision: 1546965

URL: http://svn.apache.org/r1546965
Log:
Scheduler changes to support CONNECTORS-781.  Introduce a new database table to track bins, and implement minimumDepth using lock manager constructs to work cross-cluster.  WARNING!!! Schema change!

Added:
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/bins/
      - copied from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/bins/
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/BinManagerFactory.java
      - copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/BinManagerFactory.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IBinManager.java
      - copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IBinManager.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IPriorityCalculator.java
      - copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IPriorityCalculator.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PriorityCalculator.java
      - copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PriorityCalculator.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java
      - copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java
    manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/NullOutputConnector.java
      - copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/NullOutputConnector.java
    manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulerHSQLDBTest.java
      - copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulerHSQLDBTest.java
    manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulerTester.java
      - copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulerTester.java
    manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulingRepositoryConnector.java
      - copied unchanged from r1546964, manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/SchedulingRepositoryConnector.java
Modified:
    manifoldcf/trunk/   (props changed)
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
    manifoldcf/trunk/framework/build.xml
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
    manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java
    manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java
    manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java
    manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java
    manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java

Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
  Merged /manifoldcf/branches/CONNECTORS-781:r1545632-1546964

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java Mon Dec  2 11:27:35 2013
@@ -70,8 +70,9 @@ public interface IAgent
   * Call this method to clean up dangling persistent state when a cluster is just starting
   * to come up.  This method CANNOT be called when there are any active agents
   * processes at all.
+  *@param processID is the current process ID.
   */
-  public void cleanUpAgentData(IThreadContext threadContext)
+  public void cleanUpAllAgentData(IThreadContext threadContext, String currentProcessID)
     throws ManifoldCFException;
   
   /** Cleanup after agents process.
@@ -79,9 +80,10 @@ public interface IAgent
   * This method CANNOT be called when the agent is active, but it can
   * be called at any time and by any process in order to guarantee that a terminated
   * agent does not block other agents from completing their tasks.
-  *@param processID is the process ID of the agent to clean up after.
+  *@param currentProcessID is the current process ID.
+  *@param cleanupProcessID is the process ID of the agent to clean up after.
   */
-  public void cleanUpAgentData(IThreadContext threadContext, String processID)
+  public void cleanUpAgentData(IThreadContext threadContext, String currentProcessID, String cleanupProcessID)
     throws ManifoldCFException;
 
   /** Start the agent.  This method should spin up the agent threads, and

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java Mon Dec  2 11:27:35 2013
@@ -166,6 +166,8 @@ public class ManifoldCF extends org.apac
   /** Agent service name prefix (followed by agent class name) */
   public static final String agentServicePrefix = "AGENT_";
   
+  protected static AgentsThread agentsThread = null;
+
   /** Run agents process.
   * This method will not return until a shutdown signal is sent.
   */
@@ -178,24 +180,68 @@ public class ManifoldCF extends org.apac
     if (lockManager.checkGlobalFlag(agentShutdownSignal))
       return;
 
+    // Create and start agents thread.
+    startAgents(threadContext, processID);
+    
     while (true)
     {
       // Any shutdown signal yet?
       if (lockManager.checkGlobalFlag(agentShutdownSignal))
         break;
           
-      // Start whatever agents need to be started
-      checkAgents(threadContext, processID);
-
       try
       {
-        ManifoldCF.sleep(5000);
+        ManifoldCF.sleep(5000L);
       }
       catch (InterruptedException e)
       {
         break;
       }
     }
+    
+  }
+
+  /** Start agents thread.
+  */
+  public static void startAgents(IThreadContext threadContext, String processID)
+    throws ManifoldCFException
+  {
+    // Create and start agents thread.
+    agentsThread = new AgentsThread(processID);
+    agentsThread.start();
+  }
+  
+  /** Stop all started agents.
+  */
+  public static void stopAgents(IThreadContext threadContext, String processID)
+    throws ManifoldCFException
+  {
+    // Shut down agents background thread.
+    while (agentsThread != null)
+    {
+      agentsThread.interrupt();
+      if (!agentsThread.isAlive())
+        agentsThread = null;
+    }
+    
+    // Shut down running agents services directly.
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    synchronized (runningHash)
+    {
+      // This is supposedly safe; iterator remove is used
+      Iterator<String> iter = runningHash.keySet().iterator();
+      while (iter.hasNext())
+      {
+        String className = iter.next();
+        IAgent agent = runningHash.get(className);
+        // Stop it
+        agent.stopAgent(threadContext);
+        lockManager.endServiceActivity(getAgentsClassServiceType(className), processID);
+        iter.remove();
+        agent.cleanUp(threadContext);
+      }
+    }
+    // Done.
   }
 
   protected static String getAgentsClassServiceType(String agentClassName)
@@ -203,22 +249,80 @@ public class ManifoldCF extends org.apac
     return agentServicePrefix + agentClassName;
   }
   
+  /** Agents thread.  This runs in background until interrupted, at which point
+  * it shuts down.  Its responsibilities include cleaning up after dead processes,
+  * as well as starting newly-registered agent processes, and terminating ones that disappear.
+  */
+  protected static class AgentsThread extends Thread
+  {
+    protected final String processID;
+    
+    public AgentsThread(String processID)
+    {
+      super();
+      this.processID = processID;
+      setName("Agents thread");
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        IThreadContext threadContext = ThreadContextFactory.make();
+        while (true)
+        {
+          try
+          {
+            if (Thread.currentThread().isInterrupted())
+              throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+            checkAgents(threadContext, processID);
+            ManifoldCF.sleep(5000L);
+          }
+          catch (InterruptedException e)
+          {
+            break;
+          }
+          catch (ManifoldCFException e)
+          {
+            if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+              break;
+            Logging.agents.error("Exception tossed: "+e.getMessage(),e);
+          }
+          catch (OutOfMemoryError e)
+          {
+            System.err.println("Agents process ran out of memory - shutting down");
+            e.printStackTrace(System.err);
+            System.exit(-200);
+          }
+          catch (Throwable e)
+          {
+            Logging.agents.fatal("Error tossed: "+e.getMessage(),e);
+          }
+        }
+      }
+      catch (Throwable e)
+      {
+        // Severe error on initialization
+        System.err.println("Agents process could not start - shutting down");
+        Logging.agents.fatal("AgentThread initialization error tossed: "+e.getMessage(),e);
+        System.exit(-300);
+      }
+    }
+  }
+
   /** Start all not-running agents.
   *@param threadContext is the thread context.
   */
-  public static void checkAgents(IThreadContext threadContext, String processID)
+  protected static void checkAgents(IThreadContext threadContext, String processID)
     throws ManifoldCFException
   {
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     // Get agent manager
     IAgentManager manager = AgentManagerFactory.make(threadContext);
-    ManifoldCFException problem = null;
     synchronized (runningHash)
     {
-      // DO NOT permit this method to do anything if stopAgents() has ever been called for this JVM! 
-      // (If it has, it means that the JVM is trying to shut down.)
-      if (stopAgentsRun)
-        return;
       String[] classes = manager.getAllAgents();
       Set<String> currentAgentClasses = new HashSet<String>();
 
@@ -235,7 +339,7 @@ public class ManifoldCF extends org.apac
           {
             // Throw a lock, so that cleanup processes and startup processes don't collide.
             String serviceType = getAgentsClassServiceType(className);
-            lockManager.registerServiceBeginServiceActivity(serviceType, processID, new CleanupAgent(threadContext, agent));
+            lockManager.registerServiceBeginServiceActivity(serviceType, processID, new CleanupAgent(threadContext, agent, processID));
             // There is a potential race condition where the agent has been started but hasn't yet appeared in runningHash.
             // But having runningHash be the synchronizer for this activity will prevent any problems.
             agent.startAgent(threadContext, processID);
@@ -244,8 +348,9 @@ public class ManifoldCF extends org.apac
           }
           catch (ManifoldCFException e)
           {
-            problem = e;
-            agent.cleanUp(threadContext);
+            if (e.getErrorCode() != ManifoldCFException.INTERRUPTED)
+              agent.cleanUp(threadContext);
+            throw e;
           }
         }
         currentAgentClasses.add(className);
@@ -260,25 +365,15 @@ public class ManifoldCF extends org.apac
         {
           // Shut down this one agent.
           IAgent agent = runningHash.get(runningAgentClass);
-          try
-          {
-            // Stop it
-            agent.stopAgent(threadContext);
-            lockManager.endServiceActivity(getAgentsClassServiceType(runningAgentClass), processID);
-            runningAgentsIterator.remove();
-            agent.cleanUp(threadContext);
-          }
-          catch (ManifoldCFException e)
-          {
-            problem = e;
-          }
+          // Stop it
+          agent.stopAgent(threadContext);
+          lockManager.endServiceActivity(getAgentsClassServiceType(runningAgentClass), processID);
+          runningAgentsIterator.remove();
+          agent.cleanUp(threadContext);
         }
       }
     }
 
-    if (problem != null)
-      throw problem;
-    
     synchronized (runningHash)
     {
       // For every class we're supposed to be running, find registered but no-longer-active instances and clean
@@ -286,7 +381,7 @@ public class ManifoldCF extends org.apac
       for (String agentsClass : runningHash.keySet())
       {
         IAgent agent = runningHash.get(agentsClass);
-        IServiceCleanup cleanup = new CleanupAgent(threadContext, agent);
+        IServiceCleanup cleanup = new CleanupAgent(threadContext, agent, processID);
         String agentsClassServiceType = getAgentsClassServiceType(agentsClass);
         while (!lockManager.cleanupInactiveService(agentsClassServiceType, cleanup))
         {
@@ -297,39 +392,18 @@ public class ManifoldCF extends org.apac
     
   }
 
-  /** Stop all started agents.
-  */
-  public static void stopAgents(IThreadContext threadContext, String processID)
-    throws ManifoldCFException
-  {
-    ILockManager lockManager = LockManagerFactory.make(threadContext);
-    synchronized (runningHash)
-    {
-      // This is supposedly safe; iterator remove is used
-      Iterator<String> iter = runningHash.keySet().iterator();
-      while (iter.hasNext())
-      {
-        String className = iter.next();
-        IAgent agent = runningHash.get(className);
-        // Stop it
-        agent.stopAgent(threadContext);
-        lockManager.endServiceActivity(getAgentsClassServiceType(className), processID);
-        iter.remove();
-        agent.cleanUp(threadContext);
-      }
-    }
-    // Done.
-  }
   
   protected static class CleanupAgent implements IServiceCleanup
   {
     protected final IAgent agent;
     protected final IThreadContext threadContext;
-    
-    public CleanupAgent(IThreadContext threadContext, IAgent agent)
+    protected final String processID;
+
+    public CleanupAgent(IThreadContext threadContext, IAgent agent, String processID)
     {
       this.agent = agent;
       this.threadContext = threadContext;
+      this.processID = processID;
     }
     
     /** Clean up after the specified service.  This method will block any startup of the specified
@@ -340,7 +414,7 @@ public class ManifoldCF extends org.apac
     public void cleanUpService(String serviceName)
       throws ManifoldCFException
     {
-      agent.cleanUpAgentData(threadContext, serviceName);
+      agent.cleanUpAgentData(threadContext, processID, serviceName);
     }
 
     /** Clean up after ALL services of the type on the cluster.
@@ -349,7 +423,7 @@ public class ManifoldCF extends org.apac
     public void cleanUpAllServices()
       throws ManifoldCFException
     {
-      agent.cleanUpAgentData(threadContext);
+      agent.cleanUpAllAgentData(threadContext, processID);
     }
     
     /** Perform cluster initialization - that is, whatever is needed presuming that the
@@ -360,8 +434,7 @@ public class ManifoldCF extends org.apac
     public void clusterInit()
       throws ManifoldCFException
     {
-      // MHL - we really want a separate clusterInit in agents
-      agent.cleanUpAgentData(threadContext);
+      agent.clusterInit(threadContext);
     }
 
   }

Modified: manifoldcf/trunk/framework/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/build.xml?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/build.xml (original)
+++ manifoldcf/trunk/framework/build.xml Mon Dec  2 11:27:35 2013
@@ -1486,6 +1486,25 @@
         </junit>
     </target>
 
+    <target name="run-pull-agent-tests" depends="compile-pull-agent,compile-pull-agent-tests">
+        <mkdir dir="test-output"/>
+        <junit fork="true" maxmemory="128m" dir="test-output" outputtoformatters="true" showoutput="true" haltonfailure="true">
+            <classpath>
+                <path refid="framework-classpath"/>
+                <pathelement location="build/core/classes"/>
+                <pathelement location="build/core-tests/classes"/>
+                <pathelement location="build/agents/classes"/>
+                <pathelement location="build/agents-tests/classes"/>
+                <pathelement location="build/pull-agent/classes"/>
+                <pathelement location="build/pull-agent-tests/classes"/>
+            </classpath>
+            <formatter type="brief" usefile="false"/>
+
+            <test name="org.apache.manifoldcf.crawler.tests.SchedulerHSQLDBTest" todir="test-output"/>
+
+        </junit>
+    </target>
+
     <target name="run-script-engine-tests" depends="compile-core,compile-script-engine,compile-script-engine-tests">
         <mkdir dir="test-output"/>
         <junit fork="true" maxmemory="128m" dir="test-output" outputtoformatters="true" showoutput="true" haltonfailure="true">
@@ -1519,7 +1538,7 @@
         </junit>
     </target>
 
-    <target name="run-tests" depends="compile-tests,run-core-tests,run-script-engine-tests"/>
+    <target name="run-tests" depends="compile-tests,run-core-tests,run-pull-agent-tests,run-script-engine-tests"/>
 
     <target name="run-tests-derby" depends="compile-tests">
         <mkdir dir="test-derby-output"/>

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java Mon Dec  2 11:27:35 2013
@@ -64,6 +64,7 @@ public abstract class BaseRepositoryConn
   * This must return a model value as specified above.
   *@return the model type value.
   */
+  @Override
   public int getConnectorModel()
   {
     // Return the simplest model - full everything
@@ -73,6 +74,7 @@ public abstract class BaseRepositoryConn
   /** Return the list of activities that this connector supports (i.e. writes into the log).
   *@return the list.
   */
+  @Override
   public String[] getActivitiesList()
   {
     return new String[0];
@@ -81,6 +83,7 @@ public abstract class BaseRepositoryConn
   /** Return the list of relationship types that this connector recognizes.
   *@return the list.
   */
+  @Override
   public String[] getRelationshipTypes()
   {
     // The base situation is that there are no relationships.
@@ -97,6 +100,7 @@ public abstract class BaseRepositoryConn
   *@return the set of bin names.  If an empty array is returned, it is equivalent to there being no request
   * rate throttling available for this identifier.
   */
+  @Override
   public String[] getBinNames(String documentIdentifier)
   {
     // Base version has one bin for all documents.  Use empty string for this since "*" would make
@@ -111,6 +115,7 @@ public abstract class BaseRepositoryConn
   *@param command is the command, which is taken directly from the API request.
   *@return true if the resource is found, false if not.  In either case, output may be filled in.
   */
+  @Override
   public boolean requestInfo(Configuration output, String command)
     throws ManifoldCFException
   {
@@ -143,6 +148,7 @@ public abstract class BaseRepositoryConn
   *@param endTime is the end of the time range to consider, exclusive.
   *@param jobMode is an integer describing how the job is being run, whether continuous or once-only.
   */
+  @Override
   public void addSeedDocuments(ISeedingActivity activities, DocumentSpecification spec,
     long startTime, long endTime, int jobMode)
     throws ManifoldCFException, ServiceInterruption
@@ -285,6 +291,7 @@ public abstract class BaseRepositoryConn
   * Empty version strings indicate that there is no versioning ability for the corresponding document, and the document
   * will always be processed.
   */
+  @Override
   public String[] getDocumentVersions(String[] documentIdentifiers, String[] oldVersions, IVersionActivity activities,
     DocumentSpecification spec, int jobMode, boolean usesDefaultAuthority)
     throws ManifoldCFException, ServiceInterruption
@@ -387,6 +394,7 @@ public abstract class BaseRepositoryConn
   *@param documentIdentifiers is the set of document identifiers.
   *@param versions is the corresponding set of version identifiers (individual identifiers may be null).
   */
+  @Override
   public void releaseDocumentVersions(String[] documentIdentifiers, String[] versions)
     throws ManifoldCFException
   {
@@ -396,6 +404,7 @@ public abstract class BaseRepositoryConn
   /** Get the maximum number of documents to amalgamate together into one batch, for this connector.
   *@return the maximum number. 0 indicates "unlimited".
   */
+  @Override
   public int getMaxDocumentRequest()
   {
     // Base implementation does one at a time.
@@ -416,6 +425,7 @@ public abstract class BaseRepositoryConn
   * should only find other references, and should not actually call the ingestion methods.
   *@param jobMode is an integer describing how the job is being run, whether continuous or once-only.
   */
+  @Override
   public void processDocuments(String[] documentIdentifiers, String[] versions, IProcessActivity activities,
     DocumentSpecification spec, boolean[] scanOnly, int jobMode)
     throws ManifoldCFException, ServiceInterruption
@@ -461,6 +471,7 @@ public abstract class BaseRepositoryConn
   *@param ds is the current document specification for this job.
   *@param tabsArray is an array of tab names.  Add to this array any tab names that are specific to the connector.
   */
+  @Override
   public void outputSpecificationHeader(IHTTPOutput out, Locale locale, DocumentSpecification ds, List<String> tabsArray)
     throws ManifoldCFException, IOException
   {
@@ -502,6 +513,7 @@ public abstract class BaseRepositoryConn
   *@param ds is the current document specification for this job.
   *@param tabName is the current tab name.
   */
+  @Override
   public void outputSpecificationBody(IHTTPOutput out, Locale locale, DocumentSpecification ds, String tabName)
     throws ManifoldCFException, IOException
   {
@@ -532,6 +544,7 @@ public abstract class BaseRepositoryConn
   *@return null if all is well, or a string error message if there is an error that should prevent saving of
   * the job (and cause a redirection to an error page).
   */
+  @Override
   public String processSpecificationPost(IPostParameters variableContext, Locale locale, DocumentSpecification ds)
     throws ManifoldCFException
   {
@@ -561,6 +574,7 @@ public abstract class BaseRepositoryConn
   *@param locale is the locale the output is preferred to be in.
   *@param ds is the current document specification for this job.
   */
+  @Override
   public void viewSpecification(IHTTPOutput out, Locale locale, DocumentSpecification ds)
     throws ManifoldCFException, IOException
   {

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Dec  2 11:27:35 2013
@@ -228,7 +228,7 @@ public interface IJobManager
   *@param descriptions are the document descriptions.
   *@param priorities are the desired priorities.
   */
-  public void writeDocumentPriorities(long currentTime, DocumentDescription[] descriptions, double[] priorities)
+  public void writeDocumentPriorities(long currentTime, DocumentDescription[] descriptions, IPriorityCalculator[] priorities)
     throws ManifoldCFException;
 
   // This method supports the "expiration" thread
@@ -410,9 +410,8 @@ public interface IJobManager
   * extent that if one is *already* being processed, it will need to be done over again.
   *@param documentDescriptions is the set of description objects for the documents that have had their parent carrydown information changed.
   *@param docPriorities are the document priorities to assign to the documents, if needed.
-  *@return a flag for each document priority, true if it was used, false otherwise.
   */
-  public boolean[] carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, double[] docPriorities)
+  public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, IPriorityCalculator[] docPriorities)
     throws ManifoldCFException;
 
   /** Requeue a document because of carrydown changes.
@@ -420,9 +419,8 @@ public interface IJobManager
   * extent that if it is *already* being processed, it will need to be done over again.
   *@param documentDescription is the description object for the document that has had its parent carrydown information changed.
   *@param docPriority is the document priority to assign to the document, if needed.
-  *@return a flag for the document priority, true if it was used, false otherwise.
   */
-  public boolean carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, double docPriority)
+  public void carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, IPriorityCalculator docPriority)
     throws ManifoldCFException;
 
   /** Requeue a document for further processing in the future.
@@ -528,12 +526,11 @@ public interface IJobManager
   *@param currentTime is the current time in milliseconds since epoch.
   *@param documentPriorities are the document priorities corresponding to the document identifiers.
   *@param prereqEventNames are the events that must be completed before each document can be processed.
-  *@return true if the priority value(s) were used, false otherwise.
   */
-  public boolean[] addDocumentsInitial(String processID,
+  public void addDocumentsInitial(String processID,
     Long jobID, String[] legalLinkTypes,
     String[] docIDHashes, String[] docIDs, boolean overrideSchedule,
-    int hopcountMethod, long currentTime, double[] documentPriorities,
+    int hopcountMethod, long currentTime, IPriorityCalculator[] documentPriorities,
     String[][] prereqEventNames)
     throws ManifoldCFException;
 
@@ -621,15 +618,14 @@ public interface IJobManager
   *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
   *@param priority is the desired document priority for the document.
   *@param prereqEventNames are the events that must be completed before the document can be processed.
-  *@return true if the priority value was used, false otherwise.
   */
-  public boolean addDocument(String processID,
+  public void addDocument(String processID,
     Long jobID, String[] legalLinkTypes,
     String docIDHash, String docID,
     String parentIdentifierHash,
     String relationshipType,
     int hopcountMethod, String[] dataNames, Object[][] dataValues,
-    long currentTime, double priority, String[] prereqEventNames)
+    long currentTime, IPriorityCalculator priority, String[] prereqEventNames)
     throws ManifoldCFException;
 
   /** Add documents to the queue in bulk.
@@ -652,15 +648,14 @@ public interface IJobManager
   *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
   *@param priorities are the desired document priorities for the documents.
   *@param prereqEventNames are the events that must be completed before each document can be processed.
-  *@return an array of boolean values indicating whether or not the passed-in priority value was used or not for each doc id (true if used).
   */
-  public boolean[] addDocuments(String processID,
+  public void addDocuments(String processID,
     Long jobID, String[] legalLinkTypes,
     String[] docIDHashes, String[] docIDs,
     String parentIdentifierHash,
     String relationshipType,
     int hopcountMethod, String[][] dataNames, Object[][][] dataValues,
-    long currentTime, double[] priorities,
+    long currentTime, IPriorityCalculator[] priorities,
     String[][] prereqEventNames)
     throws ManifoldCFException;
 

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java Mon Dec  2 11:27:35 2013
@@ -60,10 +60,6 @@ public class QueueTracker
   /** These are the accumulated performance averages for all connections etc. */
   protected final PerformanceStatistics performanceStatistics = new PerformanceStatistics();
 
-  /** These are the bin counts for a prioritization pass.
-  * This hash table is keyed by bin, and contains DoubleBinCount objects as values */
-  protected final Map<String,DoubleBinCount> binCounts = new HashMap<String,DoubleBinCount>();
-
   /** These are the bin counts for tracking the documents that are on
   * the active queue, but are not being processed yet */
   protected final Map<String,BinCount> queuedBinCounts = new HashMap<String,BinCount>();
@@ -71,52 +67,11 @@ public class QueueTracker
   /** These are the bin counts for active threads */
   protected final Map<String,BinCount> activeBinCounts = new HashMap<String,BinCount>();
 
-  /** The "minimum depth" - which is the smallest bin count of the last document queued.  This helps guarantee that documents that are
-  * newly discovered don't wind up with high priority, but instead wind up about the same as the currently active document priority. */
-  protected double currentMinimumDepth = 0.0;
-
-  /** This flag, when set, indicates that a reset is in progress, so queuetracker bincount updates are ignored. */
-  protected boolean resetInProgress = false;
-
-  /** This hash table is keyed by PriorityKey objects, and contains ArrayList objects containing Doubles, in sorted order. */
-  protected final Map<PriorityKey,List<Double>> availablePriorities = new HashMap<PriorityKey,List<Double>>();
-
-  /** This hash table is keyed by a String (which is the bin name), and contains a Set of PriorityKey objects containing that
-  * String as a bin */
-  protected final Map<String,Set<PriorityKey>> binDependencies = new HashMap<String,Set<PriorityKey>>();
-
-
   /** Constructor */
   public QueueTracker()
   {
   }
 
-  /** Reset the queue tracker.
-  * This occurs ONLY when we are about to reprioritize all active documents.  It does not affect the portion of the queue tracker that
-  * tracks the active queue.
-  */
-  public void beginReset()
-  {
-    synchronized (binCounts)
-    {
-      binCounts.clear();
-      currentMinimumDepth = 0.0;
-      availablePriorities.clear();
-      binDependencies.clear();
-      resetInProgress = true;
-    }
-
-  }
-
-  /** Finish the reset operation */
-  public void endReset()
-  {
-    synchronized (binCounts)
-    {
-      resetInProgress = false;
-    }
-  }
-
   /** Add an access record to the queue tracker.  This happens when a document
   * is added to the in-memory queue, and allows us to keep track of that particular event so
   * we can schedule in a way that meets our distribution goals.
@@ -153,60 +108,6 @@ public class QueueTracker
 
   }
 
-  /** Note that a priority which was previously allocated was not used, and needs to be released.
-  */
-  public void notePriorityNotUsed(String[] binNames, IRepositoryConnection connection, double priority)
-  {
-    // If this is called, it means that a calculated document priority was given out but was not used.  As such, this
-    // priority can now be assigned to the next comparable document that has similar characteristics.
-
-    // Since prioritization calculations are not reversible, these unused values are kept in a queue, and are used preferentially.
-    PriorityKey pk = new PriorityKey(binNames);
-    synchronized (binCounts)
-    {
-      List<Double> value = availablePriorities.get(pk);
-      if (value == null)
-      {
-        value = new ArrayList<Double>();
-        availablePriorities.put(pk,value);
-      }
-      // Use bisection lookup to file the current priority so that highest priority is at the end (0.0), and lowest is at the beginning
-      int begin = 0;
-      int end = value.size();
-      while (true)
-      {
-        if (end == begin)
-        {
-          value.add(end,new Double(priority));
-          break;
-        }
-        int middle = (begin + end) >> 1;
-        Double middleValue = (Double)value.get(middle);
-        if (middleValue.doubleValue() < priority)
-        {
-          end = middle;
-        }
-        else
-        {
-          begin = middle + 1;
-        }
-      }
-      // Make sure the key is asserted into the binDependencies map for each bin
-      int i = 0;
-      while (i < binNames.length)
-      {
-        String binName = binNames[i++];
-        Set<PriorityKey> hm = binDependencies.get(binName);
-        if (hm == null)
-        {
-          hm = new HashSet<PriorityKey>();
-          binDependencies.put(binName,hm);
-        }
-        hm.add(pk);
-      }
-    }
-  }
-
   /** Note the time required to successfully complete a set of documents.  This allows this module to keep track of
   * the performance characteristics of each individual connection, so distribution across connections can be balanced
   * properly.
@@ -274,55 +175,6 @@ public class QueueTracker
     }
   }
 
-  /** Assess the current minimum depth.
-  * This method is called to provide to the QueueTracker information about the priorities of the documents being currently
-  * queued.  It is the case that it is unoptimal to assign document priorities that are fundamentally higher than this value,
-  * because then the new documents will be preferentially queued, and the goal of distributing documents across bins will not be
-  * adequately met.
-  *@param binNamesSet is the current set of priorities we see on the queuing operation.
-  */
-  public void assessMinimumDepth(Double[] binNamesSet)
-  {
-    synchronized (binCounts)
-    {
-      // Ignore all numbers until reset is complete
-      if (!resetInProgress)
-      {
-        //Logging.scheduling.debug("In assessMinimumDepth");
-        int j = 0;
-        double newMinPriority = Double.MAX_VALUE;
-        while (j < binNamesSet.length)
-        {
-          Double binValue = binNamesSet[j++];
-          if (binValue.doubleValue() < newMinPriority)
-            newMinPriority = binValue.doubleValue();
-        }
-
-        if (newMinPriority != Double.MAX_VALUE)
-        {
-          // Convert minPriority to minDepth.
-          // Note that this calculation does not take into account anything having to do with connection rates, throttling,
-          // or other adjustment factors.  It allows us only to obtain the "raw" minimum depth: the depth without any
-          // adjustments.
-          double newMinDepth = Math.exp(newMinPriority)-1.0;
-
-          if (newMinDepth > currentMinimumDepth)
-          {
-            currentMinimumDepth = newMinDepth;
-            if (Logging.scheduling.isDebugEnabled())
-              Logging.scheduling.debug("Setting new minimum depth value to "+new Double(currentMinimumDepth).toString());
-          }
-          else
-          {
-            if (newMinDepth < currentMinimumDepth && Logging.scheduling.isDebugEnabled())
-              Logging.scheduling.debug("Minimum depth value seems to have been set too high too early! currentMin = "+new Double(currentMinimumDepth).toString()+"; queue value = "+new Double(newMinDepth).toString());
-          }
-        }
-      }
-    }
-
-  }
-
 
   /** Note that we have completed processing of a document with a given set of bins.
   * This method gets called when a Worker Thread has finished with a document.
@@ -406,344 +258,6 @@ public class QueueTracker
     return rval;
   }
 
-  /** This is a made-up constant, originally based on 100 documents/second, but adjusted downward as a result of experimentation and testing, which is described as "T" below.
-  */
-  private final static double minMsPerFetch = 50.0;
-
-  /** Calculate a document priority value.  Priorities are reversed, and in log space, so that
-  * zero (0.0) is considered the highest possible priority, and larger priority values are considered lower in actual priority.
-  *@param binNames are the global bins to which the document belongs.
-  *@param connection is the connection, from which the throttles may be obtained.  More highly throttled connections are given
-  *          less favorable priority.
-  *@return the priority value, based on recent history.  Also updates statistics atomically.
-  */
-  public double calculatePriority(String[] binNames, IRepositoryConnection connection)
-  {
-    synchronized (binCounts)
-    {
-
-      // NOTE: We must be sure to adjust the return value by the factor calculated due to performance; a slower throttle rate
-      // should yield a lower priority.  In theory it should be possible to calculate an adjusted priority pretty exactly,
-      // on the basis that the fetch rates of two distinct bins should grant priorities such that:
-      //
-      //  (n documents) / (the rate of fetch (docs/millisecond) of the first bin) = milliseconds for the first bin
-      //
-      //  should equal:
-      //
-      //  (m documents) / (the rate of fetch of the second bin) = milliseconds for the second bin
-      //
-      // ... and then assigning priorities so that after a given number of document priorities are assigned from the first bin, the
-      // corresponding (*m/n) number of document priorities would get assigned for the second bin.
-      //
-      // Suppose the maximum fetch rate for the document is F fetches per millisecond.  If the document priority assigned for the Bth
-      // bin member is -log(1/(1+B)) for a document fetched with no throttling whatsoever,
-      // then we want the priority to be -log(1/(1+k)) for a throttled bin, where k is chosen so that:
-      // k = B * ((T + 1/F)/T) = B * (1 + 1/TF)
-      // ... where T is the time taken to fetch a single document that has no throttling at all.
-      // For the purposes of this exercise, a value of 100 doc/sec, or T=10ms.
-      //
-      // Basically, for F = 0, k should be infinity, and for F = infinity, k should be B.
-
-
-      // First, calculate the document's max fetch rate, in fetches per millisecond.  This will be used to adjust the priority, and
-      // also when resetting the bin counts.
-      double[] maxFetchRates = calculateMaxFetchRates(binNames,connection);
-
-      // For each bin, we will be calculating the bin count scale factor, which is what we multiply the bincount by to adjust for the
-      // throttling on that bin.
-      double[] binCountScaleFactors = new double[binNames.length];
-
-
-      // Before calculating priority, reset any bins to a higher value, if it seems like it is appropriate.  This is how we avoid assigning priorities
-      // higher than the current level at which queuing is currently taking place.
-
-      // First thing to do is to reset the bin values based on the current minimum.  If we *do* wind up resetting, we also need to ditch any availablePriorities that match.
-      int i = 0;
-      while (i < binNames.length)
-      {
-        String binName = binNames[i];
-        // Remember, maxFetchRate is in fetches per ms.
-        double maxFetchRate = maxFetchRates[i];
-
-        // Calculate (and save for later) the scale factor for this bin.
-        double binCountScaleFactor;
-        if (maxFetchRate == 0.0)
-          binCountScaleFactor = Double.POSITIVE_INFINITY;
-        else
-          binCountScaleFactor = 1.0 + 1.0 / (minMsPerFetch * maxFetchRate);
-        binCountScaleFactors[i] = binCountScaleFactor;
-
-        double thisCount = 0.0;
-        DoubleBinCount bc = binCounts.get(binName);
-        if (bc != null)
-        {
-          thisCount = bc.getValue();
-        }
-        // Adjust the count, if needed, so that we are not assigning priorities greater than the current level we are
-        // grabbing documents at
-        if (thisCount * binCountScaleFactor < currentMinimumDepth)
-        {
-          double weightedMinimumDepth = currentMinimumDepth / binCountScaleFactor;
-
-          if (Logging.scheduling.isDebugEnabled())
-            Logging.scheduling.debug("Resetting value of bin '"+binName+"' to "+new Double(weightedMinimumDepth).toString()+"(scale factor is "+new Double(binCountScaleFactor)+")");
-
-          // Clear available priorities that depend on this bin
-          Set<PriorityKey> hm = binDependencies.get(binName);
-          if (hm != null)
-          {
-            for (PriorityKey pk : hm)
-            {
-              availablePriorities.remove(pk);
-            }
-            binDependencies.remove(binName);
-          }
-
-          // Set a new bin value
-          if (bc == null)
-          {
-            bc = new DoubleBinCount();
-            binCounts.put(binName,bc);
-          }
-          bc.setValue(weightedMinimumDepth);
-        }
-
-        i++;
-      }
-
-      double returnValue;
-
-      PriorityKey pk2 = new PriorityKey(binNames);
-      List<Double> queuedvalue = availablePriorities.get(pk2);
-      if (queuedvalue != null && queuedvalue.size() > 0)
-      {
-        // There's a saved value on the queue, which was calculated but not assigned earlier.  We use these values preferentially.
-        returnValue = ((Double)queuedvalue.remove(queuedvalue.size()-1)).doubleValue();
-        if (queuedvalue.size() == 0)
-        {
-          i = 0;
-          while (i < binNames.length)
-          {
-            String binName = binNames[i++];
-            Set<PriorityKey> hm = binDependencies.get(binName);
-            if (hm != null)
-            {
-              hm.remove(pk2);
-              if (hm.size() == 0)
-                binDependencies.remove(binName);
-            }
-          }
-          availablePriorities.remove(pk2);
-        }
-      }
-      else
-      {
-        // There was no previously-calculated value available, so we need to calculate a new value.
-
-        // Find the bin with the largest effective count, and use that for the document's priority.
-        // (This of course assumes that the slowest throttle is the one that wins.)
-        double highestAdjustedCount = 0.0;
-        i = 0;
-        while (i < binNames.length)
-        {
-          String binName = binNames[i];
-          double binCountScaleFactor = binCountScaleFactors[i];
-
-          double thisCount = 0.0;
-          DoubleBinCount bc = binCounts.get(binName);
-          if (bc != null)
-            thisCount = bc.getValue();
-
-          double adjustedCount;
-          // Use the scale factor already calculated above to yield a priority that is adjusted for the fetch rate.
-          if (binCountScaleFactor == Double.POSITIVE_INFINITY)
-            adjustedCount = Double.POSITIVE_INFINITY;
-          else
-            adjustedCount = thisCount * binCountScaleFactor;
-          if (adjustedCount > highestAdjustedCount)
-            highestAdjustedCount = adjustedCount;
-          i++;
-        }
-
-        // Calculate the proper log value
-        if (highestAdjustedCount == Double.POSITIVE_INFINITY)
-          returnValue = Double.POSITIVE_INFINITY;
-        else
-          returnValue = Math.log(1.0 + highestAdjustedCount);
-
-        // Update bins to indicate we used another priority.  If more than one bin is associated with the document,
-        // counts for all bins are nevertheless updated, because we don't wish to arrange scheduling collisions with hypothetical
-        // documents that share any of these bins.
-        int j = 0;
-        while (j < binNames.length)
-        {
-          String binName = binNames[j];
-          DoubleBinCount bc = binCounts.get(binName);
-          if (bc == null)
-          {
-            bc = new DoubleBinCount();
-            binCounts.put(binName,bc);
-          }
-          bc.increment();
-
-          j++;
-        }
-
-      }
-
-      if (Logging.scheduling.isDebugEnabled())
-      {
-        StringBuilder sb = new StringBuilder();
-        int k = 0;
-        while (k < binNames.length)
-        {
-          sb.append(binNames[k++]).append(" ");
-        }
-        Logging.scheduling.debug("Document with bins ["+sb.toString()+"] given priority value "+new Double(returnValue).toString());
-      }
-
-
-      return returnValue;
-    }
-  }
-
-  /** Calculate the maximum fetch rate for a given set of bins for a given connection.
-  * This is used to adjust the final priority of a document.
-  */
-  protected double[] calculateMaxFetchRates(String[] binNames, IRepositoryConnection connection)
-  {
-    ThrottleLimits tl = new ThrottleLimits(connection);
-    return tl.getMaximumRates(binNames);
-  }
-
-  /** This class represents the throttle limits out of the connection specification */
-  protected static class ThrottleLimits
-  {
-    protected ArrayList specs = new ArrayList();
-
-    public ThrottleLimits(IRepositoryConnection connection)
-    {
-      String[] throttles = connection.getThrottles();
-      int i = 0;
-      while (i < throttles.length)
-      {
-        try
-        {
-          specs.add(new ThrottleLimitSpec(throttles[i],(double)connection.getThrottleValue(throttles[i])));
-        }
-        catch (PatternSyntaxException e)
-        {
-        }
-        i++;
-      }
-    }
-
-    public double[] getMaximumRates(String[] binNames)
-    {
-      double[] rval = new double[binNames.length];
-      int j = 0;
-      while (j < binNames.length)
-      {
-        String binName = binNames[j];
-        double maxRate = Double.POSITIVE_INFINITY;
-        int i = 0;
-        while (i < specs.size())
-        {
-          ThrottleLimitSpec spec = (ThrottleLimitSpec)specs.get(i++);
-          Pattern p = spec.getRegexp();
-          Matcher m = p.matcher(binName);
-          if (m.find())
-          {
-            double rate = spec.getMaxRate();
-            // The direction of this inequality reflects the fact that the throttling is conservative when more rules are present.
-            if (rate < maxRate)
-              maxRate = rate;
-          }
-        }
-        rval[j] = maxRate;
-        j++;
-      }
-      return rval;
-    }
-
-  }
-
-  /** This is a class which describes an individual throttle limit, in fetches per millisecond. */
-  protected static class ThrottleLimitSpec
-  {
-    /** Regexp */
-    protected Pattern regexp;
-    /** The fetch limit for all bins matching that regexp, in fetches per millisecond */
-    protected double maxRate;
-
-    /** Constructor */
-    public ThrottleLimitSpec(String regexp, double maxRate)
-      throws PatternSyntaxException
-    {
-      this.regexp = Pattern.compile(regexp);
-      this.maxRate = maxRate;
-    }
-
-    /** Get the regexp. */
-    public Pattern getRegexp()
-    {
-      return regexp;
-    }
-
-    /** Get the max count */
-    public double getMaxRate()
-    {
-      return maxRate;
-    }
-  }
-
-  /** This is the key class for the availablePriorities table */
-  protected static class PriorityKey
-  {
-    // The bins, in sorted order
-    protected String[] binNames;
-
-    /** Constructor */
-    public PriorityKey(String[] binNames)
-    {
-      this.binNames = new String[binNames.length];
-      int i = 0;
-      while (i < binNames.length)
-      {
-        this.binNames[i] = binNames[i];
-        i++;
-      }
-      java.util.Arrays.sort(this.binNames);
-    }
-
-    public int hashCode()
-    {
-      int rval = 0;
-      int i = 0;
-      while (i < binNames.length)
-      {
-        rval += binNames[i++].hashCode();
-      }
-      return rval;
-    }
-
-    public boolean equals(Object o)
-    {
-      if (!(o instanceof PriorityKey))
-        return false;
-      PriorityKey p = (PriorityKey)o;
-      if (binNames.length != p.binNames.length)
-        return false;
-      int i = 0;
-      while (i < binNames.length)
-      {
-        if (!binNames[i].equals(p.binNames[i]))
-          return false;
-        i++;
-      }
-      return true;
-    }
-  }
 
   /** This is the class which allows a mutable integer count value to be saved in the bincount table.
   */
@@ -790,42 +304,5 @@ public class QueueTracker
     }
   }
 
-  /** This is the class which allows a mutable integer count value to be saved in the bincount table.
-  */
-  protected static class DoubleBinCount
-  {
-    /** The count */
-    protected double count = 0.0;
-
-    /** Create */
-    public DoubleBinCount()
-    {
-    }
-
-    public DoubleBinCount duplicate()
-    {
-      DoubleBinCount rval = new DoubleBinCount();
-      rval.count = this.count;
-      return rval;
-    }
-
-    /** Increment the counter */
-    public void increment()
-    {
-      count += 1.0;
-    }
-
-    /** Set the value */
-    public void setValue(double count)
-    {
-      this.count = count;
-    }
-
-    /** Get the value */
-    public double getValue()
-    {
-      return count;
-    }
-  }
 
 }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Dec  2 11:27:35 2013
@@ -39,16 +39,16 @@ public class JobManager implements IJobM
   protected static final String hopLock = "_HOPLOCK_";
 
   // Member variables
-  protected IDBInterface database;
-  protected IOutputConnectionManager outputMgr;
-  protected IRepositoryConnectionManager connectionMgr;
-  protected ILockManager lockManager;
-  protected IThreadContext threadContext;
-  protected JobQueue jobQueue;
-  protected Jobs jobs;
-  protected HopCount hopCount;
-  protected Carrydown carryDown;
-  protected EventManager eventManager;
+  protected final IDBInterface database;
+  protected final IOutputConnectionManager outputMgr;
+  protected final IRepositoryConnectionManager connectionMgr;
+  protected final ILockManager lockManager;
+  protected final IThreadContext threadContext;
+  protected final JobQueue jobQueue;
+  protected final Jobs jobs;
+  protected final HopCount hopCount;
+  protected final Carrydown carryDown;
+  protected final EventManager eventManager;
 
 
   protected static Random random = new Random();
@@ -70,7 +70,6 @@ public class JobManager implements IJobM
     outputMgr = OutputConnectionManagerFactory.make(threadContext);
     connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
     lockManager = LockManagerFactory.make(threadContext);
-
   }
 
   /** Install.
@@ -1933,7 +1932,7 @@ public class JobManager implements IJobM
   *@param documentDescriptions are the document descriptions.
   *@param priorities are the desired priorities.
   */
-  public void writeDocumentPriorities(long currentTime, DocumentDescription[] documentDescriptions, double[] priorities)
+  public void writeDocumentPriorities(long currentTime, DocumentDescription[] documentDescriptions, IPriorityCalculator[] priorities)
     throws ManifoldCFException
   {
 
@@ -1972,10 +1971,8 @@ public class JobManager implements IJobM
             throw new ManifoldCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
           int index = x.intValue();
           DocumentDescription dd = documentDescriptions[index];
-          double priority = priorities[index];
-          jobQueue.writeDocPriority(currentTime,dd.getID(),priorities[index]);
-          if (Logging.perf.isDebugEnabled())
-            Logging.perf.debug("Setting document priority for '"+dd.getDocumentIdentifier()+"' to "+new Double(priority).toString()+", set time "+new Long(currentTime).toString());
+          IPriorityCalculator priority = priorities[index];
+          jobQueue.writeDocPriority(currentTime,dd.getID(),priority);
           i++;
         }
         database.performCommit();
@@ -3860,27 +3857,25 @@ public class JobManager implements IJobM
   *@param currentTime is the current time in milliseconds since epoch.
   *@param documentPriorities are the document priorities corresponding to the document identifiers.
   *@param prereqEventNames are the events that must be completed before each document can be processed.
-  *@return true if the priority value(s) were used, false otherwise.
   */
   @Override
-  public boolean[] addDocumentsInitial(String processID, Long jobID, String[] legalLinkTypes,
+  public void addDocumentsInitial(String processID, Long jobID, String[] legalLinkTypes,
     String[] docIDHashes, String[] docIDs, boolean overrideSchedule,
-    int hopcountMethod, long currentTime, double[] documentPriorities,
+    int hopcountMethod, long currentTime, IPriorityCalculator[] documentPriorities,
     String[][] prereqEventNames)
     throws ManifoldCFException
   {
     if (docIDHashes.length == 0)
-      return new boolean[0];
+      return;
 
     // The document identifiers need to be sorted in a consistent fashion to reduce deadlock, and have duplicates removed, before going ahead.
     // But, the documentPriorities and the return booleans need to correspond to the initial array.  So, after we come up with
     // our internal order, we need to construct a map that takes an original index and maps it to the reduced, reordered index.
     String[] reorderedDocIDHashes = eliminateDuplicates(docIDHashes);
     HashMap reorderMap = buildReorderMap(docIDHashes,reorderedDocIDHashes);
-    double[] reorderedDocumentPriorities = new double[reorderedDocIDHashes.length];
+    IPriorityCalculator[] reorderedDocumentPriorities = new IPriorityCalculator[reorderedDocIDHashes.length];
     String[][] reorderedDocumentPrerequisites = new String[reorderedDocIDHashes.length][];
     String[] reorderedDocumentIdentifiers = new String[reorderedDocIDHashes.length];
-    boolean[] rval = new boolean[docIDHashes.length];
     int i = 0;
     while (i < docIDHashes.length)
     {
@@ -3894,7 +3889,6 @@ public class JobManager implements IJobM
           reorderedDocumentPrerequisites[newPosition.intValue()] = null;
         reorderedDocumentIdentifiers[newPosition.intValue()] = docIDs[i];
       }
-      rval[i] = false;
       i++;
     }
 
@@ -3919,12 +3913,11 @@ public class JobManager implements IJobM
           " initial docs and hopcounts for job "+jobID.toString());
 
         // Go through document id's one at a time, in order - mainly to prevent deadlock as much as possible.  Search for any existing row in jobqueue first (for update)
-        boolean[] reorderedRval = new boolean[reorderedDocIDHashes.length];
         int z = 0;
         while (z < reorderedDocIDHashes.length)
         {
           String docIDHash = reorderedDocIDHashes[z];
-          double docPriority = reorderedDocumentPriorities[z];
+          IPriorityCalculator docPriority = reorderedDocumentPriorities[z];
           String docID = reorderedDocumentIdentifiers[z];
           String[] docPrereqs = reorderedDocumentPrerequisites[z];
 
@@ -3943,7 +3936,6 @@ public class JobManager implements IJobM
 
           IResultSet set = database.performQuery(sb.toString(),list,null,null);
 
-          boolean priorityUsed;
           long executeTime = overrideSchedule?0L:-1L;
 
           if (set.getRowCount() > 0)
@@ -3956,16 +3948,15 @@ public class JobManager implements IJobM
             int status = jobQueue.stringToStatus((String)row.getValue(jobQueue.statusField));
             Long checkTimeValue = (Long)row.getValue(jobQueue.checkTimeField);
 
-            priorityUsed = jobQueue.updateExistingRecordInitial(rowID,status,checkTimeValue,executeTime,currentTime,docPriority,docPrereqs,processID);
+            jobQueue.updateExistingRecordInitial(rowID,status,checkTimeValue,executeTime,currentTime,docPriority,docPrereqs,processID);
           }
           else
           {
             // Not found.  Attempt an insert instead.  This may fail due to constraints, but if this happens, the whole transaction will be retried.
             jobQueue.insertNewRecordInitial(jobID,docIDHash,docID,docPriority,executeTime,currentTime,docPrereqs,processID);
-            priorityUsed = true;
           }
 
-          reorderedRval[z++] = priorityUsed;
+          z++;
         }
 
         if (Logging.perf.isDebugEnabled())
@@ -3983,17 +3974,7 @@ public class JobManager implements IJobM
           Logging.perf.debug("Took "+new Long(System.currentTimeMillis()-startTime).toString()+" ms to add "+Integer.toString(reorderedDocIDHashes.length)+
           " initial docs and hopcounts for job "+jobID.toString());
 
-        // Rejigger to correspond with calling order
-        i = 0;
-        while (i < docIDs.length)
-        {
-          Integer finalPosition = (Integer)reorderMap.get(new Integer(i));
-          if (finalPosition != null)
-            rval[i] = reorderedRval[finalPosition.intValue()];
-          i++;
-        }
-
-        return rval;
+        return;
       }
       catch (ManifoldCFException e)
       {
@@ -4388,20 +4369,19 @@ public class JobManager implements IJobM
   *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
   *@param documentPriorities are the desired document priorities for the documents.
   *@param prereqEventNames are the events that must be completed before a document can be queued.
-  *@return an array of boolean values indicating whether or not the passed-in priority value was used or not for each doc id (true if used).
   */
   @Override
-  public boolean[] addDocuments(String processID,
+  public void addDocuments(String processID,
     Long jobID, String[] legalLinkTypes,
     String[] docIDHashes, String[] docIDs,
     String parentIdentifierHash, String relationshipType,
     int hopcountMethod, String[][] dataNames, Object[][][] dataValues,
-    long currentTime, double[] documentPriorities,
+    long currentTime, IPriorityCalculator[] documentPriorities,
     String[][] prereqEventNames)
     throws ManifoldCFException
   {
     if (docIDs.length == 0)
-      return new boolean[0];
+      return;
 
     // Sort the id hashes and eliminate duplicates.  This will help avoid deadlock conditions.
     // However, we also need to keep the carrydown data in synch, so track that around as well, and merge if there are
@@ -4458,7 +4438,7 @@ public class JobManager implements IJobM
 
     String[] reorderedDocIDHashes = eliminateDuplicates(docIDHashes);
     HashMap reorderMap = buildReorderMap(docIDHashes,reorderedDocIDHashes);
-    double[] reorderedDocumentPriorities = new double[reorderedDocIDHashes.length];
+    IPriorityCalculator[] reorderedDocumentPriorities = new IPriorityCalculator[reorderedDocIDHashes.length];
     String[][] reorderedDocumentPrerequisites = new String[reorderedDocIDHashes.length][];
     String[] reorderedDocumentIdentifiers = new String[reorderedDocIDHashes.length];
     boolean[] rval = new boolean[docIDHashes.length];
@@ -4556,8 +4536,6 @@ public class JobManager implements IJobM
               
           IResultSet set = database.performQuery(sb.toString(),list,null,null);
 
-          boolean priorityUsed;
-
           if (set.getRowCount() > 0)
           {
             // Found a row, and it is now locked.
@@ -4586,25 +4564,19 @@ public class JobManager implements IJobM
         if (parentIdentifierHash != null && relationshipType != null)
           hopcountChangesSeen = hopCount.recordReferences(jobID,legalLinkTypes,parentIdentifierHash,reorderedDocIDHashes,relationshipType,hopcountMethod,processID);
 
-        // Loop through the document id's again, and perform updates where needed
-        boolean[] reorderedRval = new boolean[reorderedDocIDHashes.length];
-
         boolean reactivateRemovedHopcountRecords = false;
         
         for (int z = 0; z < reorderedDocIDHashes.length; z++)
         {
           String docIDHash = reorderedDocIDHashes[z];
           JobqueueRecord jr = (JobqueueRecord)existingRows.get(docIDHash);
-          if (jr == null)
-            // It was an insert
-            reorderedRval[z] = true;
-          else
+          if (jr != null)
           {
             // It was an existing row; do the update logic
             // The hopcountChangesSeen array describes whether each reference is a new one.  This
             // helps us determine whether we're going to need to "flip" HOPCOUNTREMOVED documents
             // to the PENDING state.  If the new link ended in an existing record, THEN we need to flip them all!
-            reorderedRval[z] = jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
+            jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
               0L,currentTime,carrydownChangesSeen[z] || (hopcountChangesSeen!=null && hopcountChangesSeen[z]),
               reorderedDocumentPriorities[z],reorderedDocumentPrerequisites[z]);
             // Signal if we need to perform the flip
@@ -4624,16 +4596,7 @@ public class JobManager implements IJobM
           Logging.perf.debug("Took "+new Long(System.currentTimeMillis()-startTime).toString()+" ms to add "+Integer.toString(reorderedDocIDHashes.length)+
           " docs and hopcounts for job "+jobID.toString()+" parent identifier hash "+parentIdentifierHash);
 
-        i = 0;
-        while (i < docIDHashes.length)
-        {
-          Integer finalPosition = (Integer)reorderMap.get(new Integer(i));
-          if (finalPosition != null)
-            rval[i] = reorderedRval[finalPosition.intValue()];
-          i++;
-        }
-
-        return rval;
+        return;
       }
       catch (ManifoldCFException e)
       {
@@ -4688,20 +4651,19 @@ public class JobManager implements IJobM
   *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
   *@param priority is the desired document priority for the document.
   *@param prereqEventNames are the events that must be completed before the document can be processed.
-  *@return true if the priority value was used, false otherwise.
   */
   @Override
-  public boolean addDocument(String processID,
+  public void addDocument(String processID,
     Long jobID, String[] legalLinkTypes, String docIDHash, String docID,
     String parentIdentifierHash, String relationshipType,
     int hopcountMethod, String[] dataNames, Object[][] dataValues,
-    long currentTime, double priority, String[] prereqEventNames)
+    long currentTime, IPriorityCalculator priority, String[] prereqEventNames)
     throws ManifoldCFException
   {
-    return addDocuments(processID,jobID,legalLinkTypes,
+    addDocuments(processID,jobID,legalLinkTypes,
       new String[]{docIDHash},new String[]{docID},
       parentIdentifierHash,relationshipType,hopcountMethod,new String[][]{dataNames},
-      new Object[][][]{dataValues},currentTime,new double[]{priority},new String[][]{prereqEventNames})[0];
+      new Object[][][]{dataValues},currentTime,new IPriorityCalculator[]{priority},new String[][]{prereqEventNames});
   }
 
   /** Complete adding child documents to the queue, for a set of documents.
@@ -4713,6 +4675,7 @@ public class JobManager implements IJobM
   *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
   *  to be requeued as a result of the change.
   */
+  @Override
   public DocumentDescription[] finishDocuments(Long jobID, String[] legalLinkTypes, String[] parentIdentifierHashes, int hopcountMethod)
     throws ManifoldCFException
   {
@@ -4948,6 +4911,7 @@ public class JobManager implements IJobM
   /** Complete an event sequence.
   *@param eventName is the name of the event.
   */
+  @Override
   public void completeEventSequence(String eventName)
     throws ManifoldCFException
   {
@@ -4960,13 +4924,13 @@ public class JobManager implements IJobM
   * extent that if one is *already* being processed, it will need to be done over again.
   *@param documentDescriptions is the set of description objects for the documents that have had their parent carrydown information changed.
   *@param docPriorities are the document priorities to assign to the documents, if needed.
-  *@return a flag for each document priority, true if it was used, false otherwise.
   */
-  public boolean[] carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, double[] docPriorities)
+  @Override
+  public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, IPriorityCalculator[] docPriorities)
     throws ManifoldCFException
   {
     if (documentDescriptions.length == 0)
-      return new boolean[0];
+      return;
 
     // Order the updates by document hash, to prevent deadlock as much as possible.
 
@@ -4985,8 +4949,6 @@ public class JobManager implements IJobM
     // Sort the hashes
     java.util.Arrays.sort(docIDHashes);
 
-    boolean[] rval = new boolean[docIDHashes.length];
-
     // Enter transaction and prepare to look up document states in dochash order
     while (true)
     {
@@ -5041,13 +5003,10 @@ public class JobManager implements IJobM
           int originalIndex = ((Integer)docHashMap.get(docIDHash)).intValue();
 
           JobqueueRecord jr = (JobqueueRecord)existingRows.get(docIDHash);
-          if (jr == null)
-            // It wasn't found, so the doc priority wasn't used.
-            rval[originalIndex] = false;
-          else
+          if (jr != null)
             // It was an existing row; do the update logic; use the 'carrydown changes' flag = true all the time.
-            rval[originalIndex] = jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
-            0L,currentTime,true,docPriorities[originalIndex],null);
+            jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
+              0L,currentTime,true,docPriorities[originalIndex],null);
           j++;
         }
         database.performCommit();
@@ -5076,7 +5035,6 @@ public class JobManager implements IJobM
         sleepFor(sleepAmt);
       }
     }
-    return rval;
   }
 
   /** Requeue a document because of carrydown changes.
@@ -5084,12 +5042,12 @@ public class JobManager implements IJobM
   * extent that if it is *already* being processed, it will need to be done over again.
   *@param documentDescription is the description object for the document that has had its parent carrydown information changed.
   *@param docPriority is the document priority to assign to the document, if needed.
-  *@return a flag for the document priority, true if it was used, false otherwise.
   */
-  public boolean carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, double docPriority)
+  @Override
+  public void carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, IPriorityCalculator docPriority)
     throws ManifoldCFException
   {
-    return carrydownChangeDocumentMultiple(new DocumentDescription[]{documentDescription},currentTime,new double[]{docPriority})[0];
+    carrydownChangeDocumentMultiple(new DocumentDescription[]{documentDescription},currentTime,new IPriorityCalculator[]{docPriority});
   }
 
   /** Sleep a random amount of time after a transaction abort.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Dec  2 11:27:35 2013
@@ -858,12 +858,12 @@ public class JobQueue extends org.apache
   }
 
   /** Write out a document priority */
-  public void writeDocPriority(long currentTime, Long rowID, double priority)
+  public void writeDocPriority(long currentTime, Long rowID, IPriorityCalculator priority)
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
     map.put(prioritySetField,new Long(currentTime));
-    map.put(docPriorityField,new Double(priority));
+    map.put(docPriorityField,new Double(priority.getDocumentPriority()));
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(idField,rowID)});
@@ -1177,19 +1177,18 @@ public class JobQueue extends org.apache
   /** Update an existing record (as the result of an initial add).
   * The record is presumed to exist and have been locked, via "FOR UPDATE".
   */
-  public boolean updateExistingRecordInitial(Long recordID, int currentStatus, Long checkTimeValue,
-    long desiredExecuteTime, long currentTime, double desiredPriority, String[] prereqEvents,
+  public void updateExistingRecordInitial(Long recordID, int currentStatus, Long checkTimeValue,
+    long desiredExecuteTime, long currentTime, IPriorityCalculator desiredPriority, String[] prereqEvents,
     String processID)
     throws ManifoldCFException
   {
     // The general rule here is:
     // If doesn't exist, make a PENDING entry.
-    // If PENDING, keep it as PENDING.
+    // If PENDING, keep it as PENDING.  
     // If COMPLETE, make a PENDING entry.
     // If PURGATORY, make a PENDINGPURGATORY entry.
     // Leave everything else alone and do nothing.
 
-    boolean rval = false;
     HashMap map = new HashMap();
     switch (currentStatus)
     {
@@ -1217,9 +1216,8 @@ public class JobQueue extends org.apache
       map.put(failTimeField,null);
       map.put(failCountField,null);
       // Update the doc priority.
-      map.put(docPriorityField,new Double(desiredPriority));
+      map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
       map.put(prioritySetField,new Long(currentTime));
-      rval = true;
       break;
 
     case STATUS_PENDING:
@@ -1267,7 +1265,6 @@ public class JobQueue extends org.apache
     // Insert prereqevent entries, if any
     prereqEventManager.addRows(recordID,prereqEvents);
     noteModifications(0,1,0);
-    return rval;
   }
 
   /** Insert a new record into the jobqueue table (as part of adding an initial reference).
@@ -1276,7 +1273,7 @@ public class JobQueue extends org.apache
   *@param docHash is the hash of the local document identifier.
   *@param docID is the local document identifier.
   */
-  public void insertNewRecordInitial(Long jobID, String docHash, String docID, double desiredDocPriority,
+  public void insertNewRecordInitial(Long jobID, String docHash, String docID, IPriorityCalculator desiredDocPriority,
     long desiredExecuteTime, long currentTime, String[] prereqEvents, String processID)
     throws ManifoldCFException
   {
@@ -1296,7 +1293,7 @@ public class JobQueue extends org.apache
     map.put(isSeedField,seedstatusToString(SEEDSTATUS_NEWSEED));
     map.put(seedingProcessIDField,processID);
     // Set the document priority
-    map.put(docPriorityField,new Double(desiredDocPriority));
+    map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
     map.put(prioritySetField,new Long(currentTime));
     performInsert(map,null);
     prereqEventManager.addRows(recordID,prereqEvents);
@@ -1476,14 +1473,12 @@ public class JobQueue extends org.apache
 
   /** Update an existing record (as the result of a reference add).
   * The record is presumed to exist and have been locked, via "FOR UPDATE".
-  *@return true if the document priority slot has been retained, false if freed.
   */
-  public boolean updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
+  public void updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
     long desiredExecuteTime, long currentTime, boolean otherChangesSeen,
-    double desiredPriority, String[] prereqEvents)
+    IPriorityCalculator desiredPriority, String[] prereqEvents)
     throws ManifoldCFException
   {
-    boolean rval = false;
     HashMap map = new HashMap();
     switch (currentStatus)
     {
@@ -1497,9 +1492,8 @@ public class JobQueue extends org.apache
       map.put(failTimeField,null);
       map.put(failCountField,null);
       // Going into pending: set the docpriority.
-      map.put(docPriorityField,new Double(desiredPriority));
+      map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
       map.put(prioritySetField,new Long(currentTime));
-      rval = true;
       break;
     case STATUS_COMPLETE:
     case STATUS_BEINGCLEANED:
@@ -1515,17 +1509,16 @@ public class JobQueue extends org.apache
         map.put(failTimeField,null);
         map.put(failCountField,null);
         // Going into pending: set the docpriority.
-        map.put(docPriorityField,new Double(desiredPriority));
+        map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
         map.put(prioritySetField,new Long(currentTime));
-        rval = true;
         break;
       }
-      return rval;
+      return;
     case STATUS_ACTIVENEEDRESCAN:
     case STATUS_ACTIVENEEDRESCANPURGATORY:
       // Document is in the queue, but already needs a rescan for prior reasons.
       // We're done.
-      return rval;
+      return;
     case STATUS_ACTIVE:
       // Document is in the queue.
       // The problem here is that we have no idea when the document is actually being worked on; we only find out when the document is actually *done*.
@@ -1545,12 +1538,11 @@ public class JobQueue extends org.apache
         map.put(failTimeField,null);
         map.put(failCountField,null);
         // Going into pending: set the docpriority.
-        map.put(docPriorityField,new Double(desiredPriority));
+        map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
         map.put(prioritySetField,new Long(currentTime));
-        rval = true;
         break;
       }
-      return rval;
+      return;
     case STATUS_ACTIVEPURGATORY:
       // Document is in the queue.
       // The problem here is that we have no idea when the document is actually being worked on; we only find out when the document is actually *done*.
@@ -1570,12 +1562,11 @@ public class JobQueue extends org.apache
         map.put(failTimeField,null);
         map.put(failCountField,null);
         // Going into pending: set the docpriority.
-        map.put(docPriorityField,new Double(desiredPriority));
+        map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
         map.put(prioritySetField,new Long(currentTime));
-        rval = true;
         break;
       }
-      return rval;
+      return;
     case STATUS_PENDING:
       // Document is already waiting to be processed.
       // Bump up the schedule, if called for.  Otherwise, just leave it alone.
@@ -1584,7 +1575,7 @@ public class JobQueue extends org.apache
       {
         long currentExecuteTime = cv.longValue();
         if (currentExecuteTime <= desiredExecuteTime)
-          return rval;
+          return;
       }
       map.put(checkTimeField,new Long(desiredExecuteTime));
       map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -1598,7 +1589,7 @@ public class JobQueue extends org.apache
       // Also, leave doc priority alone
       // Fall through...
     default:
-      return rval;
+      return;
     }
     prereqEventManager.deleteRows(recordID);
     ArrayList list = new ArrayList();
@@ -1607,13 +1598,13 @@ public class JobQueue extends org.apache
     performUpdate(map,"WHERE "+query,list,null);
     prereqEventManager.addRows(recordID,prereqEvents);
     noteModifications(0,1,0);
-    return rval;
+    return;
   }
 
   /** Insert a new record into the jobqueue table (as part of adding a child reference).
   *
   */
-  public void insertNewRecord(Long jobID, String docIDHash, String docID, double desiredDocPriority, long desiredExecuteTime,
+  public void insertNewRecord(Long jobID, String docIDHash, String docID, IPriorityCalculator desiredDocPriority, long desiredExecuteTime,
     long currentTime, String[] prereqEvents)
     throws ManifoldCFException
   {
@@ -1627,7 +1618,7 @@ public class JobQueue extends org.apache
     map.put(docIDField,docID);
     map.put(statusField,statusToString(STATUS_PENDING));
     // Be sure to set the priority also
-    map.put(docPriorityField,new Double(desiredDocPriority));
+    map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
     map.put(prioritySetField,new Long(currentTime));
     performInsert(map,null);
     prereqEventManager.addRows(recordID,prereqEvents);