You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2010/08/23 20:08:42 UTC
svn commit: r988237 [19/24] - in /incubator/lcf/trunk:
modules/connectors/activedirectory/connector/org/apache/acf/authorities/authorities/activedirectory/
modules/connectors/documentum/connector/org/apache/acf/crawler/authorities/DCTM/
modules/connect...
Modified: incubator/lcf/trunk/modules/framework/pull-agent/org/apache/acf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/org/apache/acf/crawler/jobs/JobManager.java?rev=988237&r1=988236&r2=988237&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/org/apache/acf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/org/apache/acf/crawler/jobs/JobManager.java Mon Aug 23 18:08:32 2010
@@ -24,7 +24,7 @@ import org.apache.acf.crawler.interfaces
import java.util.*;
import java.util.regex.*;
import org.apache.acf.crawler.system.Logging;
-import org.apache.acf.crawler.system.LCF;
+import org.apache.acf.crawler.system.ACF;
/** This is the main job manager. It provides methods that support both job definition, and the threads that execute the jobs.
*/
@@ -54,7 +54,7 @@ public class JobManager implements IJobM
*@param database is the database.
*/
public JobManager(IThreadContext threadContext, IDBInterface database)
- throws LCFException
+ throws ACFException
{
this.database = database;
this.threadContext = threadContext;
@@ -72,7 +72,7 @@ public class JobManager implements IJobM
/** Install.
*/
public void install()
- throws LCFException
+ throws ACFException
{
jobs.install(outputMgr.getTableName(),outputMgr.getConnectionNameColumn(),connectionMgr.getTableName(),connectionMgr.getConnectionNameColumn());
jobQueue.install(jobs.getTableName(),jobs.idField);
@@ -84,7 +84,7 @@ public class JobManager implements IJobM
/** Uninstall.
*/
public void deinstall()
- throws LCFException
+ throws ACFException
{
eventManager.deinstall();
carryDown.deinstall();
@@ -95,35 +95,35 @@ public class JobManager implements IJobM
/** Export configuration */
public void exportConfiguration(java.io.OutputStream os)
- throws java.io.IOException, LCFException
+ throws java.io.IOException, ACFException
{
// Write a version indicator
- LCF.writeDword(os,2);
+ ACF.writeDword(os,2);
// Get the job list
IJobDescription[] list = getAllJobs();
// Write the number of authorities
- LCF.writeDword(os,list.length);
+ ACF.writeDword(os,list.length);
// Loop through the list and write the individual repository connection info
int i = 0;
while (i < list.length)
{
IJobDescription job = list[i++];
- LCF.writeString(os,job.getConnectionName());
- LCF.writeString(os,job.getOutputConnectionName());
- LCF.writeString(os,job.getDescription());
- LCF.writeDword(os,job.getType());
- LCF.writeDword(os,job.getStartMethod());
- LCF.writeLong(os,job.getInterval());
- LCF.writeLong(os,job.getExpiration());
- LCF.writeLong(os,job.getReseedInterval());
- LCF.writeDword(os,job.getPriority());
- LCF.writeDword(os,job.getHopcountMode());
- LCF.writeString(os,job.getSpecification().toXML());
- LCF.writeString(os,job.getOutputSpecification().toXML());
+ ACF.writeString(os,job.getConnectionName());
+ ACF.writeString(os,job.getOutputConnectionName());
+ ACF.writeString(os,job.getDescription());
+ ACF.writeDword(os,job.getType());
+ ACF.writeDword(os,job.getStartMethod());
+ ACF.writeLong(os,job.getInterval());
+ ACF.writeLong(os,job.getExpiration());
+ ACF.writeLong(os,job.getReseedInterval());
+ ACF.writeDword(os,job.getPriority());
+ ACF.writeDword(os,job.getHopcountMode());
+ ACF.writeString(os,job.getSpecification().toXML());
+ ACF.writeString(os,job.getOutputSpecification().toXML());
// Write schedule
int recCount = job.getScheduleRecordCount();
- LCF.writeDword(os,recCount);
+ ACF.writeDword(os,recCount);
int j = 0;
while (j < recCount)
{
@@ -134,20 +134,20 @@ public class JobManager implements IJobM
writeEnumeratedValues(os,sr.getYear());
writeEnumeratedValues(os,sr.getHourOfDay());
writeEnumeratedValues(os,sr.getMinutesOfHour());
- LCF.writeString(os,sr.getTimezone());
- LCF.writeLong(os,sr.getDuration());
+ ACF.writeString(os,sr.getTimezone());
+ ACF.writeLong(os,sr.getDuration());
}
// Write hop count filters
Map filters = job.getHopCountFilters();
- LCF.writeDword(os,filters.size());
+ ACF.writeDword(os,filters.size());
Iterator iter = filters.keySet().iterator();
while (iter.hasNext())
{
String linkType = (String)iter.next();
Long hopcount = (Long)filters.get(linkType);
- LCF.writeString(os,linkType);
- LCF.writeLong(os,hopcount);
+ ACF.writeString(os,linkType);
+ ACF.writeLong(os,hopcount);
}
}
}
@@ -157,46 +157,46 @@ public class JobManager implements IJobM
{
if (ev == null)
{
- LCF.writeSdword(os,-1);
+ ACF.writeSdword(os,-1);
return;
}
int size = ev.size();
- LCF.writeSdword(os,size);
+ ACF.writeSdword(os,size);
Iterator iter = ev.getValues();
while (iter.hasNext())
{
- LCF.writeDword(os,((Integer)iter.next()).intValue());
+ ACF.writeDword(os,((Integer)iter.next()).intValue());
}
}
/** Import configuration */
public void importConfiguration(java.io.InputStream is)
- throws java.io.IOException, LCFException
+ throws java.io.IOException, ACFException
{
- int version = LCF.readDword(is);
+ int version = ACF.readDword(is);
if (version != 2)
throw new java.io.IOException("Unknown job configuration version: "+Integer.toString(version));
- int count = LCF.readDword(is);
+ int count = ACF.readDword(is);
int i = 0;
while (i < count)
{
IJobDescription job = createJob();
- job.setConnectionName(LCF.readString(is));
- job.setOutputConnectionName(LCF.readString(is));
- job.setDescription(LCF.readString(is));
- job.setType(LCF.readDword(is));
- job.setStartMethod(LCF.readDword(is));
- job.setInterval(LCF.readLong(is));
- job.setExpiration(LCF.readLong(is));
- job.setReseedInterval(LCF.readLong(is));
- job.setPriority(LCF.readDword(is));
- job.setHopcountMode(LCF.readDword(is));
- job.getSpecification().fromXML(LCF.readString(is));
- job.getOutputSpecification().fromXML(LCF.readString(is));
+ job.setConnectionName(ACF.readString(is));
+ job.setOutputConnectionName(ACF.readString(is));
+ job.setDescription(ACF.readString(is));
+ job.setType(ACF.readDword(is));
+ job.setStartMethod(ACF.readDword(is));
+ job.setInterval(ACF.readLong(is));
+ job.setExpiration(ACF.readLong(is));
+ job.setReseedInterval(ACF.readLong(is));
+ job.setPriority(ACF.readDword(is));
+ job.setHopcountMode(ACF.readDword(is));
+ job.getSpecification().fromXML(ACF.readString(is));
+ job.getOutputSpecification().fromXML(ACF.readString(is));
// Read schedule
- int recCount = LCF.readDword(is);
+ int recCount = ACF.readDword(is);
int j = 0;
while (j < recCount)
{
@@ -206,8 +206,8 @@ public class JobManager implements IJobM
EnumeratedValues year = readEnumeratedValues(is);
EnumeratedValues hourOfDay = readEnumeratedValues(is);
EnumeratedValues minutesOfHour = readEnumeratedValues(is);
- String timezone = LCF.readString(is);
- Long duration = LCF.readLong(is);
+ String timezone = ACF.readString(is);
+ Long duration = ACF.readLong(is);
ScheduleRecord sr = new ScheduleRecord(dayOfWeek, monthOfYear, dayOfMonth, year,
hourOfDay, minutesOfHour, timezone, duration);
@@ -216,12 +216,12 @@ public class JobManager implements IJobM
}
// Read hop count filters
- int hopFilterCount = LCF.readDword(is);
+ int hopFilterCount = ACF.readDword(is);
j = 0;
while (j < hopFilterCount)
{
- String linkType = LCF.readString(is);
- Long hopcount = LCF.readLong(is);
+ String linkType = ACF.readString(is);
+ Long hopcount = ACF.readLong(is);
job.addHopCountFilter(linkType,hopcount);
j++;
}
@@ -235,14 +235,14 @@ public class JobManager implements IJobM
protected EnumeratedValues readEnumeratedValues(java.io.InputStream is)
throws java.io.IOException
{
- int size = LCF.readSdword(is);
+ int size = ACF.readSdword(is);
if (size == -1)
return null;
int[] values = new int[size];
int i = 0;
while (i < size)
{
- values[i++] = LCF.readDword(is);
+ values[i++] = ACF.readDword(is);
}
return new EnumeratedValues(values);
}
@@ -253,7 +253,7 @@ public class JobManager implements IJobM
*@param connectionNames is the set of connection names.
*/
public void noteConnectorDeregistration(String[] connectionNames)
- throws LCFException
+ throws ACFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
StringBuffer sb = new StringBuffer();
@@ -284,7 +284,7 @@ public class JobManager implements IJobM
/** Note deregistration for a batch of connection names.
*/
protected void noteConnectionDeregistration(String query, ArrayList list)
- throws LCFException
+ throws ACFException
{
//System.out.println("Query is "+query);
// Query for the matching jobs, and then for each job potentially adjust the state
@@ -307,7 +307,7 @@ public class JobManager implements IJobM
*@param connectionNames is the set of connection names.
*/
public void noteConnectorRegistration(String[] connectionNames)
- throws LCFException
+ throws ACFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
StringBuffer sb = new StringBuffer();
@@ -338,7 +338,7 @@ public class JobManager implements IJobM
/** Note registration for a batch of connection names.
*/
protected void noteConnectionRegistration(String query, ArrayList list)
- throws LCFException
+ throws ACFException
{
// Query for the matching jobs, and then for each job potentially adjust the state
IResultSet set = database.performQuery("SELECT "+jobs.idField+","+jobs.statusField+" FROM "+
@@ -359,7 +359,7 @@ public class JobManager implements IJobM
* is signalled.
*/
public void noteConnectionChange(String connectionName)
- throws LCFException
+ throws ACFException
{
jobs.noteConnectionChange(connectionName);
}
@@ -370,7 +370,7 @@ public class JobManager implements IJobM
*@param connectionNames is the set of connection names.
*/
public void noteOutputConnectorDeregistration(String[] connectionNames)
- throws LCFException
+ throws ACFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
StringBuffer sb = new StringBuffer();
@@ -401,7 +401,7 @@ public class JobManager implements IJobM
/** Note deregistration for a batch of output connection names.
*/
protected void noteOutputConnectionDeregistration(String query, ArrayList list)
- throws LCFException
+ throws ACFException
{
//System.out.println("Query is "+query);
// Query for the matching jobs, and then for each job potentially adjust the state
@@ -424,7 +424,7 @@ public class JobManager implements IJobM
*@param connectionNames is the set of connection names.
*/
public void noteOutputConnectorRegistration(String[] connectionNames)
- throws LCFException
+ throws ACFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
StringBuffer sb = new StringBuffer();
@@ -455,7 +455,7 @@ public class JobManager implements IJobM
/** Note registration for a batch of output connection names.
*/
protected void noteOutputConnectionRegistration(String query, ArrayList list)
- throws LCFException
+ throws ACFException
{
// Query for the matching jobs, and then for each job potentially adjust the state
IResultSet set = database.performQuery("SELECT "+jobs.idField+","+jobs.statusField+" FROM "+
@@ -476,7 +476,7 @@ public class JobManager implements IJobM
* is signalled.
*/
public void noteOutputConnectionChange(String connectionName)
- throws LCFException
+ throws ACFException
{
jobs.noteOutputConnectionChange(connectionName);
}
@@ -485,7 +485,7 @@ public class JobManager implements IJobM
*@return the list, sorted by description.
*/
public IJobDescription[] getAllJobs()
- throws LCFException
+ throws ACFException
{
return jobs.getAll();
}
@@ -494,7 +494,7 @@ public class JobManager implements IJobM
*@return the new job.
*/
public IJobDescription createJob()
- throws LCFException
+ throws ACFException
{
return jobs.create();
}
@@ -512,7 +512,7 @@ public class JobManager implements IJobM
* well as remove all documents indexed by the job from the index.
*/
public void deleteJob(Long id)
- throws LCFException
+ throws ACFException
{
database.beginTransaction();
try
@@ -523,7 +523,7 @@ public class JobManager implements IJobM
IResultSet set = database.performQuery("SELECT "+jobs.statusField+","+jobs.outputNameField+" FROM "+
jobs.getTableName()+" WHERE "+jobs.idField+"=? FOR UPDATE",list,null,null);
if (set.getRowCount() == 0)
- throw new LCFException("Attempting to delete a job that doesn't exist: "+id);
+ throw new ACFException("Attempting to delete a job that doesn't exist: "+id);
IResultRow row = set.getRow(0);
String outputName = (String)row.getValue(jobs.outputNameField);
int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString());
@@ -531,9 +531,9 @@ public class JobManager implements IJobM
status == jobs.STATUS_ACTIVE_UNINSTALLED || status == jobs.STATUS_ACTIVESEEDING_UNINSTALLED ||
status == jobs.STATUS_ACTIVE_NOOUTPUT || status == jobs.STATUS_ACTIVESEEDING_NOOUTPUT ||
status == jobs.STATUS_ACTIVE_NEITHER || status == jobs.STATUS_ACTIVESEEDING_NEITHER)
- throw new LCFException("Job "+id+" is active; you must shut it down before deleting it");
+ throw new ACFException("Job "+id+" is active; you must shut it down before deleting it");
if (status != jobs.STATUS_INACTIVE)
- throw new LCFException("Job "+id+" is busy; you must wait and/or shut it down before deleting it");
+ throw new ACFException("Job "+id+" is busy; you must wait and/or shut it down before deleting it");
if (outputMgr.checkConnectorExists(outputName))
jobs.writeStatus(id,jobs.STATUS_READYFORDELETE);
else
@@ -541,7 +541,7 @@ public class JobManager implements IJobM
if (Logging.jobs.isDebugEnabled())
Logging.jobs.debug("Job "+id+" marked for deletion");
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
throw e;
@@ -563,7 +563,7 @@ public class JobManager implements IJobM
*@return null if the job doesn't exist.
*/
public IJobDescription load(Long id)
- throws LCFException
+ throws ACFException
{
return jobs.load(id,false);
}
@@ -574,7 +574,7 @@ public class JobManager implements IJobM
*@return null if the job doesn't exist.
*/
public IJobDescription load(Long id, boolean readOnly)
- throws LCFException
+ throws ACFException
{
return jobs.load(id,readOnly);
}
@@ -583,9 +583,9 @@ public class JobManager implements IJobM
*@param jobDescription is the job description.
*/
public void save(IJobDescription jobDescription)
- throws LCFException
+ throws ACFException
{
- LCF.noteConfigurationChange();
+ ACF.noteConfigurationChange();
jobs.save(jobDescription);
}
@@ -594,7 +594,7 @@ public class JobManager implements IJobM
*@return true if there is a reference, false otherwise.
*/
public boolean checkIfReference(String connectionName)
- throws LCFException
+ throws ACFException
{
return jobs.checkIfReference(connectionName);
}
@@ -604,7 +604,7 @@ public class JobManager implements IJobM
*@return true if there is a reference, false otherwise.
*/
public boolean checkIfOutputReference(String connectionName)
- throws LCFException
+ throws ACFException
{
return jobs.checkIfOutputReference(connectionName);
}
@@ -614,7 +614,7 @@ public class JobManager implements IJobM
*@return the set of job id's associated with that connection.
*/
public IJobDescription[] findJobsForConnection(String connectionName)
- throws LCFException
+ throws ACFException
{
return jobs.findJobsForConnection(connectionName);
}
@@ -632,7 +632,7 @@ public class JobManager implements IJobM
* (which is now dead), then we have to set that status back to previous value.
*/
public void prepareForStart()
- throws LCFException
+ throws ACFException
{
Logging.jobs.debug("Resetting due to restart");
while (true)
@@ -654,7 +654,7 @@ public class JobManager implements IJobM
Logging.jobs.debug("Reset complete");
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -683,7 +683,7 @@ public class JobManager implements IJobM
/** Reset as part of restoring document worker threads.
*/
public void resetDocumentWorkerStatus()
- throws LCFException
+ throws ACFException
{
Logging.jobs.debug("Resetting document active status");
while (true)
@@ -695,7 +695,7 @@ public class JobManager implements IJobM
jobQueue.resetDocumentWorkerStatus();
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -725,7 +725,7 @@ public class JobManager implements IJobM
/** Reset as part of restoring seeding threads.
*/
public void resetSeedingWorkerStatus()
- throws LCFException
+ throws ACFException
{
Logging.jobs.debug("Resetting seeding status");
jobs.resetSeedingWorkerStatus();
@@ -736,7 +736,7 @@ public class JobManager implements IJobM
/** Reset as part of restoring doc delete threads.
*/
public void resetDocDeleteWorkerStatus()
- throws LCFException
+ throws ACFException
{
Logging.jobs.debug("Resetting doc deleting status");
jobQueue.resetDocDeleteWorkerStatus();
@@ -747,7 +747,7 @@ public class JobManager implements IJobM
/** Reset as part of restoring startup threads.
*/
public void resetStartupWorkerStatus()
- throws LCFException
+ throws ACFException
{
Logging.jobs.debug("Resetting job starting up status");
jobs.resetStartupWorkerStatus();
@@ -763,7 +763,7 @@ public class JobManager implements IJobM
*@param identifiers is the set of document identifiers.
*/
public void deleteIngestedDocumentIdentifiers(DocumentDescription[] identifiers)
- throws LCFException
+ throws ACFException
{
jobQueue.deleteIngestedDocumentIdentifiers(identifiers);
// Hopcount rows get removed when the job itself is removed.
@@ -779,7 +779,7 @@ public class JobManager implements IJobM
*@return the document descriptions for these documents.
*/
public DocumentDescription[] getNextDeletableDocuments(int maxCount)
- throws LCFException
+ throws ACFException
{
// The query will be built here, because it joins the jobs table against the jobqueue
// table.
@@ -958,7 +958,7 @@ public class JobManager implements IJobM
database.signalRollback();
throw e;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -986,7 +986,7 @@ public class JobManager implements IJobM
*@return the set of documents which should be removed from the index.
*/
protected String[] getUnindexableDocumentIdentifiers(DocumentDescription[] documentIdentifiers, String connectionName)
- throws LCFException
+ throws ACFException
{
// This is where we will count the individual document id's
HashMap countMap = new HashMap();
@@ -1094,7 +1094,7 @@ public class JobManager implements IJobM
*@return the document descriptions.
*/
public DocumentDescription[] getNextAlreadyProcessedReprioritizationDocuments(long currentTime, int n)
- throws LCFException
+ throws ACFException
{
StringBuffer sb = new StringBuffer();
ArrayList list = new ArrayList();
@@ -1137,7 +1137,7 @@ public class JobManager implements IJobM
*@return the document descriptions.
*/
public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(long currentTime, int n)
- throws LCFException
+ throws ACFException
{
StringBuffer sb = new StringBuffer();
ArrayList list = new ArrayList();
@@ -1201,7 +1201,7 @@ public class JobManager implements IJobM
*@param priorities are the desired priorities.
*/
public void writeDocumentPriorities(long currentTime, DocumentDescription[] documentDescriptions, double[] priorities)
- throws LCFException
+ throws ACFException
{
// Retry loop - in case we get a deadlock despite our best efforts
@@ -1236,7 +1236,7 @@ public class JobManager implements IJobM
String docIDHash = docIDHashes[i];
Integer x = (Integer)indexMap.remove(docIDHash);
if (x == null)
- throw new LCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
+ throw new ACFException("Assertion failure: duplicate document identifier jobid/hash detected!");
int index = x.intValue();
DocumentDescription dd = documentDescriptions[index];
double priority = priorities[index];
@@ -1247,7 +1247,7 @@ public class JobManager implements IJobM
}
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -1283,7 +1283,7 @@ public class JobManager implements IJobM
*@return the array of document descriptions to expire.
*/
public DocumentDescription[] getExpiredDocuments(int n, long currentTime)
- throws LCFException
+ throws ACFException
{
// Screening query
// Moved outside of transaction, so there's less chance of keeping jobstatus cache key tied up
@@ -1415,7 +1415,7 @@ public class JobManager implements IJobM
i++;
}
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -1471,7 +1471,7 @@ public class JobManager implements IJobM
public DocumentDescription[] getNextDocuments(int n, long currentTime, long interval,
BlockingDocuments blockingDocuments, PerformanceStatistics statistics,
DepthStatistics scanRecord)
- throws LCFException
+ throws ACFException
{
// NOTE WELL: Jobs that are throttled must control the number of documents that are fetched in
// a given interval. Therefore, the returned result has the following constraints on it:
@@ -1739,7 +1739,7 @@ public class JobManager implements IJobM
}
protected void addDocumentCriteria(StringBuffer sb, ArrayList list, Long currentTimeValue, Long currentPriorityValue)
- throws LCFException
+ throws ACFException
{
list.add(currentTimeValue);
list.add(jobQueue.actionToString(JobQueue.ACTION_RESCAN));
@@ -1776,7 +1776,7 @@ public class JobManager implements IJobM
/** Fetch and process documents matching the passed-in criteria */
protected void fetchAndProcessDocuments(ArrayList answers, Long currentTimeValue, Long currentPriorityValue,
ThrottleLimit vList, IRepositoryConnection[] connections)
- throws LCFException
+ throws ACFException
{
// Note well: This query does not do "FOR UPDATE". The reason is that only one thread can possibly change the document's state to active.
@@ -1916,7 +1916,7 @@ public class JobManager implements IJobM
}
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -1954,14 +1954,14 @@ public class JobManager implements IJobM
*@return true if the job is in one of the "active" states.
*/
public boolean checkJobActive(Long jobID)
- throws LCFException
+ throws ACFException
{
return jobs.checkJobActive(jobID);
}
/** Verify if a job is still processing documents, or no longer has any outstanding active documents */
public boolean checkJobBusy(Long jobID)
- throws LCFException
+ throws ACFException
{
return jobQueue.checkJobBusy(jobID);
}
@@ -1971,7 +1971,7 @@ public class JobManager implements IJobM
*@param documentDescriptions are the description objects for the documents that were processed.
*/
public void markDocumentCompletedMultiple(DocumentDescription[] documentDescriptions)
- throws LCFException
+ throws ACFException
{
// Before we can change a document status, we need to know the *current* status. Therefore, a SELECT xxx FOR UPDATE/UPDATE
// transaction is needed in order to complete these documents correctly.
@@ -2030,7 +2030,7 @@ public class JobManager implements IJobM
}
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -2063,7 +2063,7 @@ public class JobManager implements IJobM
*@param documentDescription is the description object for the document that was processed.
*/
public void markDocumentCompleted(DocumentDescription documentDescription)
- throws LCFException
+ throws ACFException
{
markDocumentCompletedMultiple(new DocumentDescription[]{documentDescription});
}
@@ -2076,7 +2076,7 @@ public class JobManager implements IJobM
*/
public DocumentDescription[] markDocumentDeletedMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
int hopcountMethod)
- throws LCFException
+ throws ACFException
{
if (documentDescriptions.length == 0)
return new DocumentDescription[0];
@@ -2146,7 +2146,7 @@ public class JobManager implements IJobM
" docs and clean up hopcount for job "+jobID.toString());
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -2177,7 +2177,7 @@ public class JobManager implements IJobM
/** Helper method: Find the document descriptions that will be affected due to carrydown row deletions.
*/
protected DocumentDescription[] calculateAffectedDeleteCarrydownChildren(Long jobID, String[] docIDHashes)
- throws LCFException
+ throws ACFException
{
// Break the request into pieces, as needed, and throw everything into a hash for uniqueness.
// We are going to need to break up this query into a number of subqueries, each covering a subset of parent id hashes.
@@ -2224,7 +2224,7 @@ public class JobManager implements IJobM
/** Helper method: look up rows affected by a deleteRecords operation.
*/
protected void processDeleteHashSet(Long jobID, HashMap resultHash, String queryPart, ArrayList list)
- throws LCFException
+ throws ACFException
{
// The query here mirrors the carrydown.restoreRecords() delete query! However, it also fetches enough information to build a DocumentDescription
// object for return, and so a join is necessary against the jobqueue table.
@@ -2255,7 +2255,7 @@ public class JobManager implements IJobM
*/
public DocumentDescription[] markDocumentDeleted(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
int hopcountMethod)
- throws LCFException
+ throws ACFException
{
return markDocumentDeletedMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
}
@@ -2271,7 +2271,7 @@ public class JobManager implements IJobM
*/
public void requeueDocumentMultiple(DocumentDescription[] documentDescriptions, Long[] executeTimes,
int[] actions)
- throws LCFException
+ throws ACFException
{
String[] docIDHashes = new String[documentDescriptions.length];
Long[] ids = new Long[documentDescriptions.length];
@@ -2298,7 +2298,7 @@ public class JobManager implements IJobM
String docIDHash = docIDHashes[i];
Integer x = (Integer)indexMap.remove(docIDHash);
if (x == null)
- throw new LCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
+ throw new ACFException("Assertion failure: duplicate document identifier jobid/hash detected!");
int index = x.intValue();
ids[i] = documentDescriptions[index].getID();
executeTimesNew[i] = executeTimes[index];
@@ -2327,7 +2327,7 @@ public class JobManager implements IJobM
database.signalRollback();
throw e;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -2356,7 +2356,7 @@ public class JobManager implements IJobM
*@param action is what should be done when the time arrives. Choices include ACTION_RESCAN or ACTION_REMOVE.
*/
public void requeueDocument(DocumentDescription documentDescription, Long executeTime, int action)
- throws LCFException
+ throws ACFException
{
requeueDocumentMultiple(new DocumentDescription[]{documentDescription},new Long[]{executeTime},new int[]{action});
}
@@ -2374,7 +2374,7 @@ public class JobManager implements IJobM
*/
public void resetDocumentMultiple(DocumentDescription[] documentDescriptions, long executeTime,
int action, long failTime, int failCount)
- throws LCFException
+ throws ACFException
{
Long executeTimeLong = new Long(executeTime);
Long[] ids = new Long[documentDescriptions.length];
@@ -2404,7 +2404,7 @@ public class JobManager implements IJobM
String docIDHash = docIDHashes[i];
Integer x = (Integer)indexMap.remove(docIDHash);
if (x == null)
- throw new LCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
+ throw new ACFException("Assertion failure: duplicate document identifier jobid/hash detected!");
int index = x.intValue();
ids[i] = documentDescriptions[index].getID();
executeTimes[i] = executeTimeLong;
@@ -2449,7 +2449,7 @@ public class JobManager implements IJobM
database.signalRollback();
throw e;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -2478,7 +2478,7 @@ public class JobManager implements IJobM
*@param documentDescriptions is the set of description objects for the document that was processed.
*/
public void resetDeletingDocumentMultiple(DocumentDescription[] documentDescriptions)
- throws LCFException
+ throws ACFException
{
Long[] ids = new Long[documentDescriptions.length];
String[] docIDHashes = new String[documentDescriptions.length];
@@ -2503,7 +2503,7 @@ public class JobManager implements IJobM
String docIDHash = docIDHashes[i];
Integer x = (Integer)indexMap.remove(docIDHash);
if (x == null)
- throw new LCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
+ throw new ACFException("Assertion failure: duplicate document identifier jobid/hash detected!");
int index = x.intValue();
ids[i] = documentDescriptions[index].getID();
i++;
@@ -2526,7 +2526,7 @@ public class JobManager implements IJobM
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -2555,7 +2555,7 @@ public class JobManager implements IJobM
* This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
*/
public void resetDeletingDocument(DocumentDescription documentDescription)
- throws LCFException
+ throws ACFException
{
resetDeletingDocumentMultiple(new DocumentDescription[]{documentDescription});
}
@@ -2573,7 +2573,7 @@ public class JobManager implements IJobM
*/
public void resetDocument(DocumentDescription documentDescription, long executeTime, int action, long failTime,
int failCount)
- throws LCFException
+ throws ACFException
{
resetDocumentMultiple(new DocumentDescription[]{documentDescription},executeTime,action,failTime,failCount);
}
@@ -2648,7 +2648,7 @@ public class JobManager implements IJobM
String[] docIDHashes, String[] docIDs, boolean overrideSchedule,
int hopcountMethod, long currentTime, double[] documentPriorities,
String[][] prereqEventNames)
- throws LCFException
+ throws ACFException
{
if (docIDHashes.length == 0)
return new boolean[0];
@@ -2765,7 +2765,7 @@ public class JobManager implements IJobM
return rval;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -2802,7 +2802,7 @@ public class JobManager implements IJobM
*/
public void addRemainingDocumentsInitial(Long jobID, String[] legalLinkTypes, String[] docIDHashes,
int hopcountMethod)
- throws LCFException
+ throws ACFException
{
if (docIDHashes.length == 0)
return;
@@ -2837,7 +2837,7 @@ public class JobManager implements IJobM
" remaining docs and hopcounts for job "+jobID.toString());
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -2874,7 +2874,7 @@ public class JobManager implements IJobM
*/
public void doneDocumentsInitial(Long jobID, String[] legalLinkTypes, boolean isPartial,
int hopcountMethod)
- throws LCFException
+ throws ACFException
{
long startTime = 0L;
if (Logging.perf.isDebugEnabled())
@@ -2909,7 +2909,7 @@ public class JobManager implements IJobM
" ms to finish initial docs and hopcounts for job "+jobID.toString());
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -2947,13 +2947,13 @@ public class JobManager implements IJobM
*/
public boolean[] findHopCounts(Long jobID, String[] legalLinkTypes, String[] docIDHashes, String linkType, int limit,
int hopcountMethod)
- throws LCFException
+ throws ACFException
{
if (docIDHashes.length == 0)
return new boolean[0];
if (legalLinkTypes.length == 0)
- throw new LCFException("Nonsensical request; asking for hopcounts where none are kept");
+ throw new ACFException("Nonsensical request; asking for hopcounts where none are kept");
// The idea is to delay queue processing as much as possible, because that avoids having to wait
// on locks and having to repeat our evaluations.
@@ -3069,7 +3069,7 @@ public class JobManager implements IJobM
// Definitive answers found; continue through.
distances = hopCount.findHopCounts(jobID,askDocIDHashes,linkType);
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -3124,7 +3124,7 @@ public class JobManager implements IJobM
*@return the document identifiers that are currently considered to be seeds.
*/
public String[] getAllSeeds(Long jobID)
- throws LCFException
+ throws ACFException
{
return jobQueue.getAllSeeds(jobID);
}
@@ -3154,7 +3154,7 @@ public class JobManager implements IJobM
int hopcountMethod, String[][] dataNames, Object[][][] dataValues,
long currentTime, double[] documentPriorities,
String[][] prereqEventNames)
- throws LCFException
+ throws ACFException
{
if (docIDs.length == 0)
return new boolean[0];
@@ -3202,7 +3202,7 @@ public class JobManager implements IJobM
else
{
// It better be a String.
- valueHash = LCF.hash((String)values[y]);
+ valueHash = ACF.hash((String)values[y]);
}
valueMap.put(valueHash,values[y]);
y++;
@@ -3368,7 +3368,7 @@ public class JobManager implements IJobM
return rval;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -3417,7 +3417,7 @@ public class JobManager implements IJobM
String parentIdentifierHash, String relationshipType,
int hopcountMethod, String[] dataNames, Object[][] dataValues,
long currentTime, double priority, String[] prereqEventNames)
- throws LCFException
+ throws ACFException
{
return addDocuments(jobID,legalLinkTypes,
new String[]{docIDHash},new String[]{docID},
@@ -3435,7 +3435,7 @@ public class JobManager implements IJobM
* to be requeued as a result of the change.
*/
public DocumentDescription[] finishDocuments(Long jobID, String[] legalLinkTypes, String[] parentIdentifierHashes, int hopcountMethod)
- throws LCFException
+ throws ACFException
{
if (parentIdentifierHashes.length == 0)
return new DocumentDescription[0];
@@ -3457,7 +3457,7 @@ public class JobManager implements IJobM
carryDown.restoreRecords(jobID,parentIdentifierHashes);
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -3515,7 +3515,7 @@ public class JobManager implements IJobM
Integer.toString(parentIdentifierHashes.length)+" doc hopcounts for job "+jobID.toString());
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -3547,7 +3547,7 @@ public class JobManager implements IJobM
/** Helper method: Calculate the unique set of affected carrydown children resulting from a "restoreRecords" operation.
*/
protected DocumentDescription[] calculateAffectedRestoreCarrydownChildren(Long jobID, String[] parentIDHashes)
- throws LCFException
+ throws ACFException
{
// We are going to need to break up this query into a number of subqueries, each covering a subset of parent id hashes.
// The goal is to throw all the children into a hash, to make them unique at the end.
@@ -3593,7 +3593,7 @@ public class JobManager implements IJobM
/** Helper method: look up rows affected by a restoreRecords operation.
*/
protected void processParentHashSet(Long jobID, HashMap resultHash, String queryPart, ArrayList list)
- throws LCFException
+ throws ACFException
{
// The query here mirrors the carrydown.restoreRecords() delete query! However, it also fetches enough information to build a DocumentDescription
// object for return, and so a join is necessary against the jobqueue table.
@@ -3623,14 +3623,14 @@ public class JobManager implements IJobM
*@return true if the event could be created, or false if it's already there.
*/
public boolean beginEventSequence(String eventName)
- throws LCFException
+ throws ACFException
{
try
{
eventManager.createEvent(eventName);
return true;
}
- catch (LCFException e)
+ catch (ACFException e)
{
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
return false;
@@ -3642,7 +3642,7 @@ public class JobManager implements IJobM
*@param eventName is the name of the event.
*/
public void completeEventSequence(String eventName)
- throws LCFException
+ throws ACFException
{
eventManager.destroyEvent(eventName);
}
@@ -3656,7 +3656,7 @@ public class JobManager implements IJobM
*@return a flag for each document priority, true if it was used, false otherwise.
*/
public boolean[] carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, double[] docPriorities)
- throws LCFException
+ throws ACFException
{
if (documentDescriptions.length == 0)
return new boolean[0];
@@ -3739,7 +3739,7 @@ public class JobManager implements IJobM
}
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -3774,7 +3774,7 @@ public class JobManager implements IJobM
*@return a flag for the document priority, true if it was used, false otherwise.
*/
public boolean carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, double docPriority)
- throws LCFException
+ throws ACFException
{
return carrydownChangeDocumentMultiple(new DocumentDescription[]{documentDescription},currentTime,new double[]{docPriority})[0];
}
@@ -3788,18 +3788,18 @@ public class JobManager implements IJobM
}
protected void sleepFor(long amt)
- throws LCFException
+ throws ACFException
{
if (amt == 0L)
return;
try
{
- LCF.sleep(amt);
+ ACF.sleep(amt);
}
catch (InterruptedException e)
{
- throw new LCFException("Interrupted",e,LCFException.INTERRUPTED);
+ throw new ACFException("Interrupted",e,ACFException.INTERRUPTED);
}
}
@@ -3810,7 +3810,7 @@ public class JobManager implements IJobM
*@return the unique data values.
*/
public String[] retrieveParentData(Long jobID, String docIDHash, String dataName)
- throws LCFException
+ throws ACFException
{
return carryDown.getDataValues(jobID,docIDHash,dataName);
}
@@ -3822,7 +3822,7 @@ public class JobManager implements IJobM
*@return the unique data values.
*/
public CharacterInput[] retrieveParentDataAsFiles(Long jobID, String docIDHash, String dataName)
- throws LCFException
+ throws ACFException
{
return carryDown.getDataValuesAsFiles(jobID,docIDHash,dataName);
}
@@ -3843,7 +3843,7 @@ public class JobManager implements IJobM
*@param unwaitList is filled in with the set of job ID objects that were resumed.
*/
public void startJobs(long currentTime, ArrayList unwaitList)
- throws LCFException
+ throws ACFException
{
// This method should compare the lasttime field against the current time, for all
// "not active" jobs, and see if a job should be started.
@@ -4076,7 +4076,7 @@ public class JobManager implements IJobM
}
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
throw e;
@@ -4097,7 +4097,7 @@ public class JobManager implements IJobM
*@param waitList is filled in with the set of job ID's that were put into a wait state.
*/
public void waitJobs(long currentTime, ArrayList waitList)
- throws LCFException
+ throws ACFException
{
// This method assesses jobs that are ACTIVE or PAUSED to see if they should be
// converted to ACTIVEWAIT or PAUSEDWAIT. This would happen if the current time exceeded
@@ -4178,7 +4178,7 @@ public class JobManager implements IJobM
}
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
throw e;
@@ -4199,7 +4199,7 @@ public class JobManager implements IJobM
*@param jobID is the job identifier.
*/
public void resetJobSchedule(Long jobID)
- throws LCFException
+ throws ACFException
{
// Note: This is problematic; the expected behavior is for the job to start if "we are within the window",
// but not to start if the transition to active status was long enough ago.
@@ -4413,7 +4413,7 @@ public class JobManager implements IJobM
*@param jobID is the ID of the job to start.
*/
public void manualStart(Long jobID)
- throws LCFException
+ throws ACFException
{
database.beginTransaction();
try
@@ -4426,12 +4426,12 @@ public class JobManager implements IJobM
" FROM "+jobs.getTableName()+" WHERE "+
jobs.idField+"=? FOR UPDATE",list,null,null);
if (set.getRowCount() < 1)
- throw new LCFException("No such job: "+jobID);
+ throw new ACFException("No such job: "+jobID);
IResultRow row = set.getRow(0);
int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString());
if (status != Jobs.STATUS_INACTIVE)
- throw new LCFException("Job "+jobID+" is already running");
+ throw new ACFException("Job "+jobID+" is already running");
IJobDescription jobDescription = jobs.load(jobID,true);
if (Logging.jobs.isDebugEnabled())
@@ -4447,7 +4447,7 @@ public class JobManager implements IJobM
}
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
throw e;
@@ -4468,7 +4468,7 @@ public class JobManager implements IJobM
*@param startTime is the job start time.
*/
public void noteJobStarted(Long jobID, long startTime)
- throws LCFException
+ throws ACFException
{
jobs.noteJobStarted(jobID,startTime);
if (Logging.jobs.isDebugEnabled())
@@ -4480,7 +4480,7 @@ public class JobManager implements IJobM
*@param seedTime is the job seed time.
*/
public void noteJobSeeded(Long jobID, long seedTime)
- throws LCFException
+ throws ACFException
{
jobs.noteJobSeeded(jobID,seedTime);
if (Logging.jobs.isDebugEnabled())
@@ -4492,7 +4492,7 @@ public class JobManager implements IJobM
*@param hopcountMethod describes how to handle deletions for hopcount purposes.
*/
public void prepareFullScan(Long jobID, String[] legalLinkTypes, int hopcountMethod)
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -4511,7 +4511,7 @@ public class JobManager implements IJobM
jobQueue.prepareFullScan(jobID);
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -4541,7 +4541,7 @@ public class JobManager implements IJobM
*@param hopcountMethod describes how to handle deletions for hopcount purposes.
*/
public void prepareIncrementalScan(Long jobID, String[] legalLinkTypes, int hopcountMethod)
- throws LCFException
+ throws ACFException
{
jobQueue.prepareIncrementalScan(jobID);
}
@@ -4551,7 +4551,7 @@ public class JobManager implements IJobM
*@param jobID is the job to abort.
*/
public void manualAbort(Long jobID)
- throws LCFException
+ throws ACFException
{
// Just whack status back to "INACTIVE". The active documents will continue to be processed until done,
// but that's fine. There will be no finishing stage, obviously.
@@ -4571,7 +4571,7 @@ public class JobManager implements IJobM
*@param jobID is the job to abort.
*/
public void manualAbortRestart(Long jobID)
- throws LCFException
+ throws ACFException
{
if (Logging.jobs.isDebugEnabled())
{
@@ -4590,7 +4590,7 @@ public class JobManager implements IJobM
*@return true if this is the first logged abort request for this job.
*/
public boolean errorAbort(Long jobID, String errorText)
- throws LCFException
+ throws ACFException
{
// Just whack status back to "INACTIVE". The active documents will continue to be processed until done,
// but that's fine. There will be no finishing stage, obviously.
@@ -4610,7 +4610,7 @@ public class JobManager implements IJobM
*@param jobID is the job identifier to pause.
*/
public void pauseJob(Long jobID)
- throws LCFException
+ throws ACFException
{
if (Logging.jobs.isDebugEnabled())
{
@@ -4628,7 +4628,7 @@ public class JobManager implements IJobM
*@param jobID is the job identifier to restart.
*/
public void restartJob(Long jobID)
- throws LCFException
+ throws ACFException
{
if (Logging.jobs.isDebugEnabled())
{
@@ -4641,7 +4641,7 @@ public class JobManager implements IJobM
jobs.restartJob(jobID);
jobQueue.clearFailTimes(jobID);
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
throw e;
@@ -4668,7 +4668,7 @@ public class JobManager implements IJobM
* based on what the connector says should be added to the queue.
*/
public JobStartRecord[] getJobsReadyForSeeding(long currentTime)
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -4717,7 +4717,7 @@ public class JobManager implements IJobM
}
return rval;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -4746,7 +4746,7 @@ public class JobManager implements IJobM
*@return jobs that were in the "readyforstartup" state. These will be marked as being in the "starting up" state.
*/
public JobStartRecord[] getJobsReadyForStartup()
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -4782,7 +4782,7 @@ public class JobManager implements IJobM
}
return rval;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -4811,7 +4811,7 @@ public class JobManager implements IJobM
*@param jobID is the job id.
*/
public void resetStartupJob(Long jobID)
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -4825,7 +4825,7 @@ public class JobManager implements IJobM
IResultSet set = database.performQuery("SELECT "+jobs.statusField+" FROM "+jobs.getTableName()+
" WHERE "+jobs.idField+"=? FOR UPDATE",list,null,null);
if (set.getRowCount() == 0)
- throw new LCFException("No such job: "+jobID);
+ throw new ACFException("No such job: "+jobID);
IResultRow row = set.getRow(0);
int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
@@ -4855,11 +4855,11 @@ public class JobManager implements IJobM
// ok
break;
default:
- throw new LCFException("Unexpected job status: "+Integer.toString(status));
+ throw new ACFException("Unexpected job status: "+Integer.toString(status));
}
return;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -4888,7 +4888,7 @@ public class JobManager implements IJobM
*@param jobID is the job id.
*/
public void resetSeedJob(Long jobID)
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -4902,7 +4902,7 @@ public class JobManager implements IJobM
IResultSet set = database.performQuery("SELECT "+jobs.statusField+" FROM "+jobs.getTableName()+
" WHERE "+jobs.idField+"=? FOR UPDATE",list,null,null);
if (set.getRowCount() == 0)
- throw new LCFException("No such job: "+jobID);
+ throw new ACFException("No such job: "+jobID);
IResultRow row = set.getRow(0);
int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
switch (status)
@@ -4984,11 +4984,11 @@ public class JobManager implements IJobM
// ok
break;
default:
- throw new LCFException("Unexpected job status: "+Integer.toString(status));
+ throw new ACFException("Unexpected job status: "+Integer.toString(status));
}
return;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -5018,7 +5018,7 @@ public class JobManager implements IJobM
* This method is meant to be called periodically to perform delete processing on jobs.
*/
public void deleteJobsReadyForDelete()
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -5075,7 +5075,7 @@ public class JobManager implements IJobM
if (confirmSet.getRowCount() > 0)
continue;
- LCF.noteConfigurationChange();
+ ACF.noteConfigurationChange();
// Remove documents from job queue
jobQueue.deleteAllJobRecords(jobID);
// Remove carrydowns for the job
@@ -5090,7 +5090,7 @@ public class JobManager implements IJobM
}
return;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -5119,7 +5119,7 @@ public class JobManager implements IJobM
* decides if it is time to issue an ANALYZE request.
*/
protected void conditionallyAnalyzeTables()
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -5130,7 +5130,7 @@ public class JobManager implements IJobM
jobQueue.conditionallyAnalyzeTables();
break;
}
- catch (LCFException e)
+ catch (ACFException e)
{
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
{
@@ -5152,7 +5152,7 @@ public class JobManager implements IJobM
*@param finishList is filled in with the set of IJobDescription objects that were completed.
*/
public void finishJobs(ArrayList finishList)
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -5229,7 +5229,7 @@ public class JobManager implements IJobM
}
return;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -5259,7 +5259,7 @@ public class JobManager implements IJobM
*@param abortJobs is the set of IJobDescription objects that were aborted (and stopped).
*/
public void finishJobAborts(long timestamp, ArrayList abortJobs)
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -5326,12 +5326,12 @@ public class JobManager implements IJobM
}
break;
default:
- throw new LCFException("Unexpected value for job status: "+Integer.toString(status));
+ throw new ACFException("Unexpected value for job status: "+Integer.toString(status));
}
}
return;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -5361,7 +5361,7 @@ public class JobManager implements IJobM
*@param currentTime is the current time in milliseconds since epoch.
*/
public void resetJobs(long currentTime)
- throws LCFException
+ throws ACFException
{
while (true)
{
@@ -5413,7 +5413,7 @@ public class JobManager implements IJobM
}
return;
}
- catch (LCFException e)
+ catch (ACFException e)
{
database.signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
@@ -5445,7 +5445,7 @@ public class JobManager implements IJobM
*@return the status object for the specified job.
*/
public JobStatus getStatus(Long jobID)
- throws LCFException
+ throws ACFException
{
String whereClause = Jobs.idField+"="+jobID.toString();
JobStatus[] records = makeJobStatus(whereClause);
@@ -5459,7 +5459,7 @@ public class JobManager implements IJobM
*@return an ordered array of job status objects.
*/
public JobStatus[] getAllStatus()
- throws LCFException
+ throws ACFException
{
String whereClause = null;
return makeJobStatus(whereClause);
@@ -5470,7 +5470,7 @@ public class JobManager implements IJobM
*@return an array of the job status objects.
*/
public JobStatus[] getRunningJobs()
- throws LCFException
+ throws ACFException
{
String whereClause =
Jobs.statusField+" IN ("+
@@ -5496,7 +5496,7 @@ public class JobManager implements IJobM
*@return an array of the job status objects.
*/
public JobStatus[] getFinishedJobs()
- throws LCFException
+ throws ACFException
{
String whereClause =
Jobs.statusField+"="+database.quoteSQLString(Jobs.statusToString(Jobs.STATUS_INACTIVE))+" AND "+
@@ -5512,7 +5512,7 @@ public class JobManager implements IJobM
*@return the status array.
*/
protected JobStatus[] makeJobStatus(String whereClause)
- throws LCFException
+ throws ACFException
{
IResultSet set = database.performQuery("SELECT t0."+
Jobs.idField+",t0."+
@@ -5689,7 +5689,7 @@ public class JobManager implements IJobM
*/
public IResultSet genDocumentStatus(String connectionName, StatusFilterCriteria filterCriteria, SortOrder sortOrder,
int startRow, int rowCount)
- throws LCFException
+ throws ACFException
{
// Build the query.
Long currentTime = new Long(System.currentTimeMillis());
@@ -5798,7 +5798,7 @@ public class JobManager implements IJobM
*/
public IResultSet genQueueStatus(String connectionName, StatusFilterCriteria filterCriteria, SortOrder sortOrder,
BucketDescription idBucketDescription, int startRow, int rowCount)
- throws LCFException
+ throws ACFException
{
// SELECT substring(docid FROM '<id_regexp>') AS idbucket,
// substring(entityidentifier FROM '<id_regexp>') AS idbucket,
@@ -5919,7 +5919,7 @@ public class JobManager implements IJobM
/** Add criteria clauses to query.
*/
protected boolean addCriteria(StringBuffer sb, String fieldPrefix, String connectionName, StatusFilterCriteria criteria, boolean whereEmitted)
- throws LCFException
+ throws ACFException
{
Long[] matchingJobs = criteria.getJobs();
@@ -6260,7 +6260,7 @@ public class JobManager implements IJobM
*@param connectionName is the connection name.
*/
public void addConnectionName(String connectionName, IRepositoryConnector connectorInstance)
- throws LCFException
+ throws ACFException
{
activeConnections.put(connectionName,connectorInstance);
int setSize = connectorInstance.getMaxDocumentRequest();
@@ -6379,7 +6379,7 @@ public class JobManager implements IJobM
*@return true if it should be included, false otherwise.
*/
public boolean checkInclude(IResultRow row)
- throws LCFException
+ throws ACFException
{
// Note: This method does two things: First, it insures that the number of documents per job per bin does
// not exceed the calculated throttle number. Second, it keeps track of how many document queue items
@@ -6431,7 +6431,7 @@ public class JobManager implements IJobM
// Figure out what the right bins are, given the data we have.
// This will involve a call to the connector.
- String[] binNames = LCF.calculateBins(connectorInstance,docID);
+ String[] binNames = ACF.calculateBins(connectorInstance,docID);
// Keep the running count, so we can abort without going through the whole set.
documentsProcessed++;
//scanRecord.addBins(binNames);
@@ -6485,7 +6485,7 @@ public class JobManager implements IJobM
*@return true if we need to keep going, or false if we are done.
*/
public boolean checkContinue()
- throws LCFException
+ throws ACFException
{
if (documentsProcessed >= EXTRA_FACTOR * n * maxSetSize)
return false;
Modified: incubator/lcf/trunk/modules/framework/pull-agent/org/apache/acf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/org/apache/acf/crawler/jobs/JobQueue.java?rev=988237&r1=988236&r2=988237&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/org/apache/acf/crawler/jobs/JobQueue.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/org/apache/acf/crawler/jobs/JobQueue.java Mon Aug 23 18:08:32 2010
@@ -21,7 +21,7 @@ package org.apache.acf.crawler.jobs;
import org.apache.acf.core.interfaces.*;
import org.apache.acf.crawler.interfaces.*;
import org.apache.acf.crawler.system.Logging;
-import org.apache.acf.crawler.system.LCF;
+import org.apache.acf.crawler.system.ACF;
import java.util.*;
/** This is the job queue manager class. It is responsible for managing the jobqueue database table.
@@ -139,7 +139,7 @@ public class JobQueue extends org.apache
*@param database is the database handle.
*/
public JobQueue(IThreadContext tc, IDBInterface database)
- throws LCFException
+ throws ACFException
{
super(database,"jobqueue");
this.threadContext = tc;
@@ -149,7 +149,7 @@ public class JobQueue extends org.apache
/** Install or upgrade.
*/
public void install(String jobsTable, String jobsColumn)
- throws LCFException
+ throws ACFException
{
// Standard practice to use outer loop to allow retry in case of upgrade.
while (true)
@@ -254,7 +254,7 @@ public class JobQueue extends org.apache
/** Analyze job tables due to major event */
public void unconditionallyAnalyzeTables()
- throws LCFException
+ throws ACFException
{
try
{
@@ -272,7 +272,7 @@ public class JobQueue extends org.apache
/** Analyze job tables that need analysis.
*/
public void conditionallyAnalyzeTables()
- throws LCFException
+ throws ACFException
{
if (tracker.checkAnalyze())
{
@@ -298,7 +298,7 @@ public class JobQueue extends org.apache
/** Uninstall.
*/
public void deinstall()
- throws LCFException
+ throws ACFException
{
beginTransaction();
try
@@ -306,7 +306,7 @@ public class JobQueue extends org.apache
prereqEventManager.deinstall();
performDrop(null);
}
- catch (LCFException e)
+ catch (ACFException e)
{
signalRollback();
throw e;
@@ -327,7 +327,7 @@ public class JobQueue extends org.apache
* reasonable, so the jobs can be restarted and work properly to completion.
*/
public void restart()
- throws LCFException
+ throws ACFException
{
// Map ACTIVE back to PENDING.
HashMap map = new HashMap();
@@ -373,7 +373,7 @@ public class JobQueue extends org.apache
*@param jobID is the job identifier.
*/
public void clearFailTimes(Long jobID)
- throws LCFException
+ throws ACFException
{
ArrayList list = new ArrayList();
list.add(jobID);
@@ -388,7 +388,7 @@ public class JobQueue extends org.apache
* resets any active documents back to the right state (waiting for stuffing).
*/
public void resetDocumentWorkerStatus()
- throws LCFException
+ throws ACFException
{
// Map ACTIVE back to PENDING.
HashMap map = new HashMap();
@@ -409,7 +409,7 @@ public class JobQueue extends org.apache
/** Reset doc delete worker status.
*/
public void resetDocDeleteWorkerStatus()
- throws LCFException
+ throws ACFException
{
HashMap map = new HashMap();
ArrayList list = new ArrayList();
@@ -429,7 +429,7 @@ public class JobQueue extends org.apache
*@param jobID is the job identifier.
*/
public void prepareFullScan(Long jobID)
- throws LCFException
+ throws ACFException
{
// Delete PENDING and ACTIVE entries
ArrayList list = new ArrayList();
@@ -471,7 +471,7 @@ public class JobQueue extends org.apache
*@param jobID is the job identifier.
*/
public void prepareIncrementalScan(Long jobID)
- throws LCFException
+ throws ACFException
{
// Delete PENDING and ACTIVE entries
ArrayList list = new ArrayList();
@@ -500,7 +500,7 @@ public class JobQueue extends org.apache
*@param identifiers is the set of document identifiers.
*/
public void deleteIngestedDocumentIdentifiers(DocumentDescription[] identifiers)
- throws LCFException
+ throws ACFException
{
ArrayList list = new ArrayList();
StringBuffer sb = new StringBuffer();
@@ -519,7 +519,7 @@ public class JobQueue extends org.apache
/** Check if there are any outstanding active documents for a job */
public boolean checkJobBusy(Long jobID)
- throws LCFException
+ throws ACFException
{
ArrayList list = new ArrayList();
list.add(jobID);
@@ -536,7 +536,7 @@ public class JobQueue extends org.apache
*@param jobID is the job identifier.
*/
public void deleteAllJobRecords(Long jobID)
- throws LCFException
+ throws ACFException
{
ArrayList list = new ArrayList();
list.add(jobID);
@@ -548,7 +548,7 @@ public class JobQueue extends org.apache
/** Write out a document priority */
public void writeDocPriority(long currentTime, Long rowID, double priority)
- throws LCFException
+ throws ACFException
{
HashMap map = new HashMap();
map.put(prioritySetField,new Long(currentTime));
@@ -562,7 +562,7 @@ public class JobQueue extends org.apache
/** Set the "completed" status for a record.
*/
public void updateCompletedRecord(Long recID, int currentStatus)
- throws LCFException
+ throws ACFException
{
int newStatus;
String actionFieldValue;
@@ -582,7 +582,7 @@ public class JobQueue extends org.apache
checkTimeValue = new Long(0L);
break;
default:
- throw new LCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status");
+ throw new ACFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status");
}
HashMap map = new HashMap();
@@ -605,7 +605,7 @@ public class JobQueue extends org.apache
*@param currentStatus is the current status
*/
public void updateActiveRecord(Long id, int currentStatus)
- throws LCFException
+ throws ACFException
{
int newStatus;
switch (currentStatus)
@@ -617,7 +617,7 @@ public class JobQueue extends org.apache
newStatus = STATUS_ACTIVEPURGATORY;
break;
default:
- throw new LCFException("Unexpected status value for jobqueue record "+id.toString()+"; got "+Integer.toString(currentStatus));
+ throw new ACFException("Unexpected status value for jobqueue record "+id.toString()+"; got "+Integer.toString(currentStatus));
}
ArrayList list = new ArrayList();
@@ -635,7 +635,7 @@ public class JobQueue extends org.apache
*/
public void setStatus(Long id, int status,
Long checkTime, int action, long failTime, int failCount)
- throws LCFException
+ throws ACFException
{
ArrayList list = new ArrayList();
list.add(id);
@@ -660,7 +660,7 @@ public class JobQueue extends org.apache
/** Set the status of a document to "being deleted".
*/
public void setDeletingStatus(Long id)
- throws LCFException
+ throws ACFException
{
ArrayList list = new ArrayList();
list.add(id);
@@ -672,7 +672,7 @@ public class JobQueue extends org.apache
/** Set the status of a document to be "no longer deleting" */
public void setUndeletingStatus(Long id)
- throws LCFException
+ throws ACFException
{
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_COMPLETE));
@@ -689,7 +689,7 @@ public class JobQueue extends org.apache
*@param ids is the set of job queue id's
*/
public void deleteRecordMultiple(Long[] ids)
- throws LCFException
+ throws ACFException
{
// Delete in chunks
int maxInClause = getMaxInClause();
@@ -720,7 +720,7 @@ public class JobQueue extends org.apache
/** Do a batch of deletes.
*/
protected void doDeletes(ArrayList list, String queryPart)
- throws LCFException
+ throws ACFException
{
// Clean out prereqevents table first
prereqEventManager.deleteRows(queryPart,list);
@@ -731,7 +731,7 @@ public class JobQueue extends org.apache
*@param id is the job queue id.
*/
public void deleteRecord(Long id)
- throws LCFException
+ throws ACFException
{
deleteRecordMultiple(new Long[]{id});
}
@@ -741,7 +741,7 @@ public class JobQueue extends org.apache
*/
public boolean updateExistingRecordInitial(Long recordID, int currentStatus, Long checkTimeValue,
long desiredExecuteTime, long currentTime, double desiredPriority, String[] prereqEvents)
- throws LCFException
+ throws ACFException
{
// The general rule here is:
// If doesn't exist, make a PENDING entry.
@@ -833,7 +833,7 @@ public class JobQueue extends org.apache
*/
public void insertNewRecordInitial(Long jobID, String docHash, String docID, double desiredDocPriority,
long desiredExecuteTime, long currentTime, String[] prereqEvents)
- throws LCFException
+ throws ACFException
{
// No prerequisites should be possible at this point.
HashMap map = new HashMap();
@@ -861,7 +861,7 @@ public class JobQueue extends org.apache
* doneDocumentsInitial() method does not clean up seeds from previous runs wrongly.
*/
public void addRemainingDocumentsInitial(Long jobID, String[] docIDHashes)
- throws LCFException
+ throws ACFException
{
if (docIDHashes.length == 0)
return;
@@ -939,7 +939,7 @@ public class JobQueue extends org.apache
/** Process the specified set of documents. */
protected void processRemainingDocuments(Map idMap, String query, ArrayList list, Map inSet)
- throws LCFException
+ throws ACFException
{
IResultSet set = performQuery("SELECT "+idField+","+docHashField+" FROM "+getTableName()+
" WHERE "+query+" FOR UPDATE",list,null,null);
@@ -958,7 +958,7 @@ public class JobQueue extends org.apache
/** Update the specified set of documents to be "NEWSEED" */
protected void updateRemainingDocuments(String query, ArrayList list)
- throws LCFException
+ throws ACFException
{
HashMap map = new HashMap();
map.put(isSeedField,seedstatusToString(SEEDSTATUS_NEWSEED));
@@ -973,7 +973,7 @@ public class JobQueue extends org.apache
*@param isPartial is true of the passed list of seeds is not complete.
*/
public void doneDocumentsInitial(Long jobID, boolean isPartial)
- throws LCFException
+ throws ACFException
{
ArrayList list = new ArrayList();
HashMap map = new HashMap();
@@ -997,7 +997,7 @@ public class JobQueue extends org.apache
*@return the document identifier hashes that are currently considered to be seeds.
*/
public String[] getAllSeeds(Long jobID)
- throws LCFException
+ throws ACFException
{
ArrayList list = new ArrayList();
list.add(jobID);
@@ -1020,7 +1020,7 @@ public class JobQueue extends org.apache
public boolean updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
long desiredExecuteTime, long currentTime, boolean otherChangesSeen, double desiredPriority,
String[] prereqEvents)
- throws LCFException
+ throws ACFException
{
boolean rval = false;
HashMap map = new HashMap();
@@ -1147,7 +1147,7 @@ public class JobQueue extends org.apache
*/
public void insertNewRecord(Long jobID, String docIDHash, String docID, double desiredDocPriority, long desiredExecuteTime,
long currentTime, String[] prereqEvents)
- throws LCFException
+ throws ACFException
{
HashMap map = new HashMap();
Long recordID = new Long(IDFactory.make(threadContext));
@@ -1171,7 +1171,7 @@ public class JobQueue extends org.apache
/** Convert seedstatus value to a string.
*/
public static String seedstatusToString(int status)
- throws LCFException
+ throws ACFException
{
switch (status)
{
@@ -1182,37 +1182,37 @@ public class JobQueue extends org.apache
case SEEDSTATUS_NEWSEED:
return "N";
default:
- throw new LCFException("Invalid seed status: "+Integer.toString(status));
+ throw new ACFException("Invalid seed status: "+Integer.toString(status));
}
}
/** Convert seedstatus field value to a boolean.
*/
public static int stringToSeedstatus(String x)
- throws LCFException
+ throws ACFException
{
if (x == null || x.length() == 0)
return SEEDSTATUS_NOTSEED;
Integer y = (Integer)seedstatusMap.get(x);
if (y == null)
- throw new LCFException("Unknown seed status code: "+x);
+ throw new ACFException("Unknown seed status code: "+x);
return y.intValue();
}
/** Convert action field value to integer.
*/
public static int stringToAction(String value)
- throws LCFException
+ throws ACFException
{
Integer x = (Integer)actionMap.get(value);
if (x == null)
- throw new LCFException("Unknown action string: '"+value+"'");
+ throw new ACFException("Unknown action string: '"+value+"'");
return x.intValue();
}
/** Convert integer to action string */
public static String actionToString(int action)
- throws LCFException
+ throws ACFException
{
switch (action)
{
@@ -1221,7 +1221,7 @@ public class JobQueue extends org.apache
case ACTION_REMOVE:
return "D";
default:
- throw new LCFException("Bad action value: "+Integer.toString(action));
+ throw new ACFException("Bad action value: "+Integer.toString(action));
}
}
@@ -1230,11 +1230,11 @@ public class JobQueue extends org.apache
*@return the integer.
*/
public static int stringToStatus(String value)
- throws LCFException
+ throws ACFException
{
Integer x = (Integer)statusMap.get(value);
if (x == null)
- throw new LCFException("Unknown status string: '"+value+"'");
+ throw new ACFException("Unknown status string: '"+value+"'");
return x.intValue();
}
@@ -1243,7 +1243,7 @@ public class JobQueue extends org.apache
*@return the database string.
*/
public static String statusToString(int status)
- throws LCFException
+ throws ACFException
{
switch (status)
{
@@ -1266,7 +1266,7 @@ public class JobQueue extends org.apache
case STATUS_ACTIVENEEDRESCANPURGATORY:
return "f";
default:
- throw new LCFException("Bad status value: "+Integer.toString(status));
+ throw new ACFException("Bad status value: "+Integer.toString(status));
}
}
@@ -1277,9 +1277,9 @@ public class JobQueue extends org.apache
*@return the hash code.
*/
public static String getHashCode(String documentIdentifier)
- throws LCFException
+ throws ACFException
{
- return LCF.hash(documentIdentifier);
+ return ACF.hash(documentIdentifier);
}
/** Analyze tracker class.
@@ -1390,7 +1390,7 @@ public class JobQueue extends org.apache
*@return true if it should be included, false otherwise.
*/
public boolean checkInclude(IResultRow row)
- throws LCFException
+ throws ACFException
{
Long jobID = (Long)row.getValue(jobIDField);
String docIDHash = (String)row.getValue(docHashField);
@@ -1406,7 +1406,7 @@ public class JobQueue extends org.apache
*@return true if we need to keep going, or false if we are done.
*/
public boolean checkContinue()
- throws LCFException
+ throws ACFException
{
return true;
}