You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/10 01:19:10 UTC
svn commit: r1601529 [4/6] - in /manifoldcf/trunk: ./ connectors/
connectors/nulltransformation/
framework/agents/src/main/java/org/apache/manifoldcf/agents/
framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/
framework/agent...
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java Mon Jun 9 23:19:08 2014
@@ -121,9 +121,10 @@ public class ForcedParamManager extends
public Map<String,Set<String>> readRows(Long id)
throws ManifoldCFException
{
- ArrayList list = new ArrayList();
- list.add(id);
- IResultSet set = performQuery("SELECT "+paramNameField+","+paramValueField+" FROM "+getTableName()+" WHERE "+ownerIDField+"=?",list,
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(ownerIDField,id)});
+ IResultSet set = performQuery("SELECT "+paramNameField+","+paramValueField+" FROM "+getTableName()+" WHERE "+query,params,
null,null);
Map<String,Set<String>> rval = new HashMap<String,Set<String>>();
if (set.getRowCount() == 0)
@@ -168,6 +169,34 @@ public class ForcedParamManager extends
}
}
+ /** Compare rows in database against what is in job description.
+ *@param ownerID is the owning identifier.
+ *@param list is the job description to write hopcount filters for.
+ */
+ public boolean compareRows(Long ownerID, IJobDescription list)
+ throws ManifoldCFException
+ {
+ Map<String,Set<String>> map = readRows(ownerID);
+ Map<String,Set<String>> otherMap = list.getForcedMetadata();
+ if (map.size() != otherMap.size())
+ return false;
+ for (String x : map.keySet())
+ {
+ Set<String> xValues = map.get(x);
+ Set<String> otherValues = otherMap.get(x);
+ if (otherValues == null)
+ return false;
+ if (xValues.size() != otherValues.size())
+ return false;
+ for (String y : xValues)
+ {
+ if (!otherValues.contains(y))
+ return false;
+ }
+ }
+ return true;
+ }
+
/** Write a filter list into the database.
*@param ownerID is the owning identifier.
*@param list is the job description to write hopcount filters for.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopFilterManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopFilterManager.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopFilterManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopFilterManager.java Mon Jun 9 23:19:08 2014
@@ -147,7 +147,7 @@ public class HopFilterManager extends or
*@param ownerIDList is the list of owner id's.
*@param ownerIDParams is the corresponding set of owner id parameters.
*/
- public void getRows(Map returnValues, String ownerIDList, ArrayList ownerIDParams)
+ public void getRows(Map<Long,JobDescription> returnValues, String ownerIDList, ArrayList ownerIDParams)
throws ManifoldCFException
{
IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+ownerIDField+" IN ("+ownerIDList+")",ownerIDParams,
@@ -159,11 +159,35 @@ public class HopFilterManager extends or
Long ownerID = (Long)row.getValue(ownerIDField);
String linkType = (String)row.getValue(linkTypeField);
Long maxHops = (Long)row.getValue(maxHopsField);
- ((JobDescription)returnValues.get(ownerID)).addHopCountFilter(linkType,maxHops);
+ returnValues.get(ownerID).addHopCountFilter(linkType,maxHops);
i++;
}
}
+ /** Compare a filter list against what's in a job description.
+ *@param ownerID is the owning identifier.
+ *@param list is the job description to write hopcount filters for.
+ */
+ public boolean compareRows(Long ownerID, IJobDescription list)
+ throws ManifoldCFException
+ {
+ // Compare hopcount filter criteria.
+ Map filterRows = readRows(ownerID);
+ Map newFilterRows = list.getHopCountFilters();
+ if (filterRows.size() != newFilterRows.size())
+ return false;
+ for (String linkType : (Collection<String>)filterRows.keySet())
+ {
+ Long oldCount = (Long)filterRows.get(linkType);
+ Long newCount = (Long)newFilterRows.get(linkType);
+ if (oldCount == null || newCount == null)
+ return false;
+ if (oldCount.longValue() != newCount.longValue())
+ return false;
+ }
+ return true;
+ }
+
/** Write a filter list into the database.
*@param ownerID is the owning identifier.
*@param list is the job description to write hopcount filters for.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java Mon Jun 9 23:19:08 2014
@@ -44,6 +44,7 @@ public class JobDescription implements I
protected String description = null;
protected String outputConnectionName = null;
protected String connectionName = null;
+ protected final List<PipelineStage> pipelineStages = new ArrayList<PipelineStage>();
protected int type = TYPE_CONTINUOUS;
protected int startMethod = START_WINDOWBEGIN;
protected int priority = 5;
@@ -96,7 +97,15 @@ public class JobDescription implements I
rval.id = id;
rval.isNew = isNew;
rval.outputConnectionName = outputConnectionName;
+ // Direct modification of this object is possible - so it also has to know if it is read-only!!
+ rval.outputSpecification = outputSpecification.duplicate(readOnly);
rval.connectionName = connectionName;
+ // Direct modification of this object is possible - so it also has to know if it is read-only!!
+ rval.documentSpecification = documentSpecification.duplicate(readOnly);
+ for (PipelineStage pipelineStage : pipelineStages)
+ {
+ rval.pipelineStages.add(new PipelineStage(pipelineStage.getConnectionName(),pipelineStage.getDescription(),pipelineStage.getSpecification().duplicate(readOnly)));
+ }
rval.description = description;
rval.type = type;
// No direct modification of this object is possible
@@ -124,10 +133,6 @@ public class JobDescription implements I
rval.addForcedMetadataValue(forcedParamName,value);
}
}
- // Direct modification of this object is possible - so it also has to know if it is read-only!!
- rval.outputSpecification = outputSpecification.duplicate(readOnly);
- // Direct modification of this object is possible - so it also has to know if it is read-only!!
- rval.documentSpecification = documentSpecification.duplicate(readOnly);
rval.readOnly = readOnly;
return rval;
}
@@ -242,6 +247,91 @@ public class JobDescription implements I
return connectionName;
}
+ /** Clear pipeline connections */
+ @Override
+ public void clearPipeline()
+ {
+ if (readOnly)
+ throw new IllegalStateException("Attempt to change read-only object");
+ pipelineStages.clear();
+ }
+
+ /** Add a pipeline connection.
+ *@param pipelineStageConnectionName is the name of the pipeline connection to add.
+ *@param pipelineStageDescription is a description of the pipeline stage being added.
+ *@return the empty output specification for this pipeline stage.
+ */
+ @Override
+ public OutputSpecification addPipelineStage(String pipelineStageConnectionName, String pipelineStageDescription)
+ {
+ if (readOnly)
+ throw new IllegalStateException("Attempt to change read-only object");
+ PipelineStage ps = new PipelineStage(pipelineStageConnectionName,pipelineStageDescription);
+ pipelineStages.add(ps);
+ return ps.getSpecification();
+ }
+
+ /** Get a count of pipeline stages */
+ @Override
+ public int countPipelineStages()
+ {
+ return pipelineStages.size();
+ }
+
+ /** Insert a new pipeline stage.
+ *@param index is the index to insert pipeline stage before
+ *@param pipelineStageConnectionName is the connection name.
+ *@param pipelineStageDescription is the description.
+ *@return the newly-created output specification.
+ */
+ public OutputSpecification insertPipelineStage(int index, String pipelineStageConnectionName, String pipelineStageDescription)
+ {
+ if (readOnly)
+ throw new IllegalStateException("Attempt to change read-only object");
+ PipelineStage ps = new PipelineStage(pipelineStageConnectionName,pipelineStageDescription);
+ pipelineStages.add(index,ps);
+ return ps.getSpecification();
+ }
+
+ /** Get a specific pipeline connection name.
+ *@param index is the index of the pipeline stage whose connection name to get.
+ *@return the name of the connection.
+ */
+ @Override
+ public String getPipelineStageConnectionName(int index)
+ {
+ return pipelineStages.get(index).getConnectionName();
+ }
+
+ /** Get a specific pipeline stage description.
+ *@param index is the index of the pipeline stage whose description to get.
+ *@return the name of the connection.
+ */
+ @Override
+ public String getPipelineStageDescription(int index)
+ {
+ return pipelineStages.get(index).getDescription();
+ }
+
+ /** Get a specific pipeline stage specification.
+ *@param index is the index of the pipeline stage whose specification is needed.
+ *@return the specification for the connection.
+ */
+ @Override
+ public OutputSpecification getPipelineStageSpecification(int index)
+ {
+ return pipelineStages.get(index).getSpecification();
+ }
+
+ /** Delete a pipeline stage.
+ *@param index is the index of the pipeline stage to delete.
+ */
+ @Override
+ public void deletePipelineStage(int index)
+ {
+ pipelineStages.remove(index);
+ }
+
/** Set the job type.
*@param type is the type (as an integer).
*/
@@ -544,4 +634,40 @@ public class JobDescription implements I
rval.add(value);
}
+ protected static class PipelineStage
+ {
+ protected final String connectionName;
+ protected final String description;
+ protected final OutputSpecification specification;
+
+ public PipelineStage(String connectionName, String description)
+ {
+ this.connectionName = connectionName;
+ this.description = description;
+ this.specification = new OutputSpecification();
+ }
+
+ public PipelineStage(String connectionName, String description, OutputSpecification spec)
+ {
+ this.connectionName = connectionName;
+ this.description = description;
+ this.specification = spec;
+ }
+
+ public OutputSpecification getSpecification()
+ {
+ return specification;
+ }
+
+ public String getConnectionName()
+ {
+ return connectionName;
+ }
+
+ public String getDescription()
+ {
+ return description;
+ }
+ }
+
}
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Jun 9 23:19:08 2014
@@ -45,6 +45,12 @@ public class JobManager implements IJobM
protected final IDBInterface database;
protected final IOutputConnectionManager outputMgr;
protected final IRepositoryConnectionManager connectionMgr;
+ protected final ITransformationConnectionManager transformationMgr;
+
+ protected final IOutputConnectorManager outputConnectorMgr;
+ protected final IConnectorManager connectorMgr;
+ protected final ITransformationConnectorManager transformationConnectorMgr;
+
protected final IRepositoryConnectorPool repositoryConnectorPool;
protected final ILockManager lockManager;
protected final IThreadContext threadContext;
@@ -73,6 +79,10 @@ public class JobManager implements IJobM
eventManager = new EventManager(database);
outputMgr = OutputConnectionManagerFactory.make(threadContext);
connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+ transformationMgr = TransformationConnectionManagerFactory.make(threadContext);
+ outputConnectorMgr = OutputConnectorManagerFactory.make(threadContext);
+ connectorMgr = ConnectorManagerFactory.make(threadContext);
+ transformationConnectorMgr = TransformationConnectorManagerFactory.make(threadContext);
repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
lockManager = LockManagerFactory.make(threadContext);
}
@@ -83,7 +93,9 @@ public class JobManager implements IJobM
public void install()
throws ManifoldCFException
{
- jobs.install(outputMgr.getTableName(),outputMgr.getConnectionNameColumn(),connectionMgr.getTableName(),connectionMgr.getConnectionNameColumn());
+ jobs.install(transformationMgr.getTableName(),transformationMgr.getConnectionNameColumn(),
+ outputMgr.getTableName(),outputMgr.getConnectionNameColumn(),
+ connectionMgr.getTableName(),connectionMgr.getConnectionNameColumn());
jobQueue.install(jobs.getTableName(),jobs.idField);
hopCount.install(jobs.getTableName(),jobs.idField);
carryDown.install(jobs.getTableName(),jobs.idField);
@@ -109,16 +121,14 @@ public class JobManager implements IJobM
throws java.io.IOException, ManifoldCFException
{
// Write a version indicator
- ManifoldCF.writeDword(os,3);
+ ManifoldCF.writeDword(os,4);
// Get the job list
IJobDescription[] list = getAllJobs();
// Write the number of authorities
ManifoldCF.writeDword(os,list.length);
// Loop through the list and write the individual repository connection info
- int i = 0;
- while (i < list.length)
+ for (IJobDescription job : list)
{
- IJobDescription job = list[i++];
ManifoldCF.writeString(os,job.getConnectionName());
ManifoldCF.writeString(os,job.getOutputConnectionName());
ManifoldCF.writeString(os,job.getDescription());
@@ -135,10 +145,9 @@ public class JobManager implements IJobM
// Write schedule
int recCount = job.getScheduleRecordCount();
ManifoldCF.writeDword(os,recCount);
- int j = 0;
- while (j < recCount)
+ for (int j = 0; j < recCount; j++)
{
- ScheduleRecord sr = job.getScheduleRecord(j++);
+ ScheduleRecord sr = job.getScheduleRecord(j);
writeEnumeratedValues(os,sr.getDayOfWeek());
writeEnumeratedValues(os,sr.getMonthOfYear());
writeEnumeratedValues(os,sr.getDayOfMonth());
@@ -161,6 +170,29 @@ public class JobManager implements IJobM
ManifoldCF.writeString(os,linkType);
ManifoldCF.writeLong(os,hopcount);
}
+
+ // Write forced metadata information
+ Map<String,Set<String>> forcedMetadata = job.getForcedMetadata();
+ ManifoldCF.writeDword(os,forcedMetadata.size());
+ for (String key : forcedMetadata.keySet())
+ {
+ ManifoldCF.writeString(os,key);
+ Set<String> values = forcedMetadata.get(key);
+ ManifoldCF.writeDword(os,values.size());
+ for (String value : values)
+ {
+ ManifoldCF.writeString(os,value);
+ }
+ }
+
+ // Write pipeline information
+ ManifoldCF.writeDword(os,job.countPipelineStages());
+ for (int j = 0; j < job.countPipelineStages(); j++)
+ {
+ ManifoldCF.writeString(os,job.getPipelineStageConnectionName(j));
+ ManifoldCF.writeString(os,job.getPipelineStageDescription(j));
+ ManifoldCF.writeString(os,job.getPipelineStageSpecification(j).toXML());
+ }
}
}
@@ -187,11 +219,10 @@ public class JobManager implements IJobM
throws java.io.IOException, ManifoldCFException
{
int version = ManifoldCF.readDword(is);
- if (version != 2 && version != 3)
+ if (version != 2 && version != 3 && version != 4)
throw new java.io.IOException("Unknown job configuration version: "+Integer.toString(version));
int count = ManifoldCF.readDword(is);
- int i = 0;
- while (i < count)
+ for (int i = 0; i < count; i++)
{
IJobDescription job = createJob();
@@ -210,8 +241,7 @@ public class JobManager implements IJobM
// Read schedule
int recCount = ManifoldCF.readDword(is);
- int j = 0;
- while (j < recCount)
+ for (int j = 0; j < recCount; j++)
{
EnumeratedValues dayOfWeek = readEnumeratedValues(is);
EnumeratedValues monthOfYear = readEnumeratedValues(is);
@@ -230,23 +260,45 @@ public class JobManager implements IJobM
ScheduleRecord sr = new ScheduleRecord(dayOfWeek, monthOfYear, dayOfMonth, year,
hourOfDay, minutesOfHour, timezone, duration, requestMinimum);
job.addScheduleRecord(sr);
- j++;
}
// Read hop count filters
int hopFilterCount = ManifoldCF.readDword(is);
- j = 0;
- while (j < hopFilterCount)
+ for (int j = 0; j < hopFilterCount; j++)
{
String linkType = ManifoldCF.readString(is);
Long hopcount = ManifoldCF.readLong(is);
job.addHopCountFilter(linkType,hopcount);
- j++;
}
+ if (version >= 4)
+ {
+ // Read forced metadata information
+ int paramCount = ManifoldCF.readDword(is);
+ for (int j = 0; j < paramCount; j++)
+ {
+ String key = ManifoldCF.readString(is);
+ int valueCount = ManifoldCF.readDword(is);
+ for (int k = 0; k < valueCount; k++)
+ {
+ String value = ManifoldCF.readString(is);
+ job.addForcedMetadataValue(key,value);
+ }
+ }
+
+ // Read pipeline information
+ int pipelineCount = ManifoldCF.readDword(is);
+ for (int j = 0; j < pipelineCount; j++)
+ {
+ String connectionName = ManifoldCF.readString(is);
+ String description = ManifoldCF.readString(is);
+ String specification = ManifoldCF.readString(is);
+ job.addPipelineStage(connectionName,description).fromXML(specification);
+ }
+ }
+
// Attempt to save this job
save(job);
- i++;
}
}
@@ -275,7 +327,7 @@ public class JobManager implements IJobM
throws ManifoldCFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
int currentCount = 0;
int i = 0;
@@ -297,7 +349,7 @@ public class JobManager implements IJobM
/** Note deregistration for a batch of connection names.
*/
- protected void noteConnectionDeregistration(ArrayList list)
+ protected void noteConnectionDeregistration(List<String> list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -327,7 +379,7 @@ public class JobManager implements IJobM
throws ManifoldCFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
int currentCount = 0;
int i = 0;
@@ -349,7 +401,7 @@ public class JobManager implements IJobM
/** Note registration for a batch of connection names.
*/
- protected void noteConnectionRegistration(ArrayList list)
+ protected void noteConnectionRegistration(List<String> list)
throws ManifoldCFException
{
// Query for the matching jobs, and then for each job potentially adjust the state
@@ -369,17 +421,6 @@ public class JobManager implements IJobM
}
}
- /** Note a change in connection configuration.
- * This method will be called whenever a connection's configuration is modified, or when an external repository change
- * is signalled.
- */
- @Override
- public void noteConnectionChange(String connectionName)
- throws ManifoldCFException
- {
- jobs.noteConnectionChange(connectionName);
- }
-
/** Note the deregistration of an output connector used by the specified connections.
* This method will be called when the connector is deregistered. Jobs that use these connections
* must therefore enter appropriate states.
@@ -390,7 +431,7 @@ public class JobManager implements IJobM
throws ManifoldCFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
int currentCount = 0;
int i = 0;
@@ -412,7 +453,7 @@ public class JobManager implements IJobM
/** Note deregistration for a batch of output connection names.
*/
- protected void noteOutputConnectionDeregistration(ArrayList list)
+ protected void noteOutputConnectionDeregistration(List<String> list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -442,7 +483,7 @@ public class JobManager implements IJobM
throws ManifoldCFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
int currentCount = 0;
int i = 0;
@@ -464,7 +505,7 @@ public class JobManager implements IJobM
/** Note registration for a batch of output connection names.
*/
- protected void noteOutputConnectionRegistration(ArrayList list)
+ protected void noteOutputConnectionRegistration(List<String> list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -484,6 +525,129 @@ public class JobManager implements IJobM
}
}
+ /** Note the deregistration of a transformation connector used by the specified connections.
+ * This method will be called when the connector is deregistered. Jobs that use these connections
+ * must therefore enter appropriate states.
+ *@param connectionNames is the set of connection names.
+ */
+ @Override
+ public void noteTransformationConnectorDeregistration(String[] connectionNames)
+ throws ManifoldCFException
+ {
+ // For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
+ List<String> list = new ArrayList<String>();
+ int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
+ int currentCount = 0;
+ int i = 0;
+ while (i < connectionNames.length)
+ {
+ if (currentCount == maxCount)
+ {
+ noteConnectionDeregistration(list);
+ list.clear();
+ currentCount = 0;
+ }
+
+ list.add(connectionNames[i++]);
+ currentCount++;
+ }
+ if (currentCount > 0)
+ noteTransformationConnectionDeregistration(list);
+ }
+
+ /** Note deregistration for a batch of transformation connection names.
+ */
+ protected void noteTransformationConnectionDeregistration(List<String> list)
+ throws ManifoldCFException
+ {
+ // Query for the matching jobs, and then for each job potentially adjust the state
+ Long[] jobIDs = jobs.findJobsMatchingTransformations(list);
+ StringBuilder query = new StringBuilder();
+ ArrayList newList = new ArrayList();
+
+ query.append("SELECT ").append(jobs.idField).append(",").append(jobs.statusField)
+ .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+ .append(database.buildConjunctionClause(newList,new ClauseDescription[]{
+ new MultiClause(jobs.idField,jobIDs)}))
+ .append(" FOR UPDATE");
+ IResultSet set = database.performQuery(query.toString(),newList,null,null);
+ int i = 0;
+ while (i < set.getRowCount())
+ {
+ IResultRow row = set.getRow(i++);
+ Long jobID = (Long)row.getValue(jobs.idField);
+ int statusValue = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+ jobs.noteTransformationConnectorDeregistration(jobID,statusValue);
+ }
+ }
+
+ /** Note the registration of a transformation connector used by the specified connections.
+ * This method will be called when a connector is registered, on which the specified
+ * connections depend.
+ *@param connectionNames is the set of connection names.
+ */
+ @Override
+ public void noteTransformationConnectorRegistration(String[] connectionNames)
+ throws ManifoldCFException
+ {
+ // For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
+ List<String> list = new ArrayList<String>();
+ int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
+ int currentCount = 0;
+ int i = 0;
+ while (i < connectionNames.length)
+ {
+ if (currentCount == maxCount)
+ {
+ noteConnectionDeregistration(list);
+ list.clear();
+ currentCount = 0;
+ }
+
+ list.add(connectionNames[i++]);
+ currentCount++;
+ }
+ if (currentCount > 0)
+ noteTransformationConnectionRegistration(list);
+ }
+
+ /** Note registration for a batch of transformation connection names.
+ */
+ protected void noteTransformationConnectionRegistration(List<String> list)
+ throws ManifoldCFException
+ {
+ // Query for the matching jobs, and then for each job potentially adjust the state
+ Long[] jobIDs = jobs.findJobsMatchingTransformations(list);
+ StringBuilder query = new StringBuilder();
+ ArrayList newList = new ArrayList();
+
+ query.append("SELECT ").append(jobs.idField).append(",").append(jobs.statusField)
+ .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+ .append(database.buildConjunctionClause(newList,new ClauseDescription[]{
+ new MultiClause(jobs.idField,jobIDs)}))
+ .append(" FOR UPDATE");
+ IResultSet set = database.performQuery(query.toString(),newList,null,null);
+ int i = 0;
+ while (i < set.getRowCount())
+ {
+ IResultRow row = set.getRow(i++);
+ Long jobID = (Long)row.getValue(jobs.idField);
+ int statusValue = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+ jobs.noteTransformationConnectorRegistration(jobID,statusValue);
+ }
+ }
+
+ /** Note a change in connection configuration.
+ * This method will be called whenever a connection's configuration is modified, or when an external repository change
+ * is signalled.
+ */
+ @Override
+ public void noteConnectionChange(String connectionName)
+ throws ManifoldCFException
+ {
+ jobs.noteConnectionChange(connectionName);
+ }
+
/** Note a change in output connection configuration.
* This method will be called whenever a connection's configuration is modified, or when an external target config change
* is signalled.
@@ -495,6 +659,48 @@ public class JobManager implements IJobM
jobs.noteOutputConnectionChange(connectionName);
}
+ /** Note a change in transformation connection configuration.
+ * This method will be called whenever a connection's configuration is modified.
+ */
+ @Override
+ public void noteTransformationConnectionChange(String connectionName)
+ throws ManifoldCFException
+ {
+ jobs.noteTransformationConnectionChange(connectionName);
+ }
+
+ /** Assess jobs marked to be in need of assessment for connector status changes.
+ */
+ public void assessMarkedJobs()
+ throws ManifoldCFException
+ {
+ database.beginTransaction();
+ try
+ {
+ // Query for all jobs marked "ASSESSMENT_UNKNOWN".
+ jobs.assessMarkedJobs();
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ catch (RuntimeException e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ }
+ }
+
/** Load a sorted list of job descriptions.
*@return the list, sorted by description.
*/
@@ -545,9 +751,7 @@ public class JobManager implements IJobM
IResultRow row = set.getRow(0);
int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString());
if (status == jobs.STATUS_ACTIVE || status == jobs.STATUS_ACTIVESEEDING ||
- status == jobs.STATUS_ACTIVE_UNINSTALLED || status == jobs.STATUS_ACTIVESEEDING_UNINSTALLED ||
- status == jobs.STATUS_ACTIVE_NOOUTPUT || status == jobs.STATUS_ACTIVESEEDING_NOOUTPUT ||
- status == jobs.STATUS_ACTIVE_NEITHER || status == jobs.STATUS_ACTIVESEEDING_NEITHER)
+ status == jobs.STATUS_ACTIVE_UNINSTALLED || status == jobs.STATUS_ACTIVESEEDING_UNINSTALLED)
throw new ManifoldCFException("Job "+id+" is active; you must shut it down before deleting it");
if (status != jobs.STATUS_INACTIVE)
throw new ManifoldCFException("Job "+id+" is busy; you must wait and/or shut it down before deleting it");
@@ -560,6 +764,11 @@ public class JobManager implements IJobM
database.signalRollback();
throw e;
}
+ catch (RuntimeException e)
+ {
+ database.signalRollback();
+ throw e;
+ }
catch (Error e)
{
database.signalRollback();
@@ -628,6 +837,17 @@ public class JobManager implements IJobM
return jobs.checkIfOutputReference(connectionName);
}
+ /** See if there's a reference to a transformation connection name.
+ *@param connectionName is the name of the connection.
+ *@return true if there is a reference, false otherwise.
+ */
+ @Override
+ public boolean checkIfTransformationReference(String connectionName)
+ throws ManifoldCFException
+ {
+ return jobs.checkIfTransformationReference(connectionName);
+ }
+
/** Get the job IDs associated with a given connection name.
*@param connectionName is the name of the connection.
*@return the set of job id's associated with that connection.
@@ -1932,11 +2152,7 @@ public class JobManager implements IJobM
Jobs.statusToString(Jobs.STATUS_ACTIVE),
Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING),
Jobs.statusToString(Jobs.STATUS_ACTIVE_UNINSTALLED),
- Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED),
- Jobs.statusToString(Jobs.STATUS_ACTIVE_NOOUTPUT),
- Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NOOUTPUT),
- Jobs.statusToString(Jobs.STATUS_ACTIVE_NEITHER),
- Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NEITHER)
+ Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED)
}),
new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)}))
.append(") ");
@@ -2582,7 +2798,39 @@ public class JobManager implements IJobM
true)).append(" ")
.append(database.constructOffsetLimitClause(0,1,true));
- IResultSet set = database.performQuery(sb.toString(),list,null,null,1,null);
+ IResultSet set;
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ set = database.performQuery(sb.toString(),list,null,null,1,null);
+ break;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction adding document bins: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ }
+ }
+
if (set.getRowCount() > 0)
{
IResultRow row = set.getRow(0);
@@ -2659,6 +2907,8 @@ public class JobManager implements IJobM
"t0."+jobQueue.docPriorityField, "t0."+jobQueue.statusField, "t0."+jobQueue.checkActionField, "t0."+jobQueue.checkTimeField},
true)).append(" ");
+ String query = sb.toString();
+
// Before entering the transaction, we must provide the throttlelimit object with all the connector
// instances it could possibly need. The purpose for doing this is to prevent a deadlock where
// connector starvation causes database lockup.
@@ -2700,7 +2950,7 @@ public class JobManager implements IJobM
// Now we can tack the limit onto the query. Before this point, remainingDocuments would be crap
int limitValue = vList.getRemainingDocuments();
- sb.append(database.constructOffsetLimitClause(0,limitValue,true));
+ String finalQuery = query + database.constructOffsetLimitClause(0,limitValue,true);
if (Logging.perf.isDebugEnabled())
{
@@ -2711,7 +2961,7 @@ public class JobManager implements IJobM
database.beginTransaction();
try
{
- IResultSet set = database.performQuery(sb.toString(),list,null,null,-1,vList);
+ IResultSet set = database.performQuery(finalQuery,list,null,null,-1,vList);
if (Logging.perf.isDebugEnabled())
Logging.perf.debug(" Queuing "+Integer.toString(set.getRowCount())+" documents");
@@ -5634,10 +5884,6 @@ public class JobManager implements IJobM
jobs.statusToString(jobs.STATUS_ACTIVESEEDING),
jobs.statusToString(jobs.STATUS_ACTIVE_UNINSTALLED),
jobs.statusToString(jobs.STATUS_ACTIVESEEDING_UNINSTALLED),
- jobs.statusToString(jobs.STATUS_ACTIVE_NOOUTPUT),
- jobs.statusToString(jobs.STATUS_ACTIVESEEDING_NOOUTPUT),
- jobs.statusToString(jobs.STATUS_ACTIVE_NEITHER),
- jobs.statusToString(jobs.STATUS_ACTIVESEEDING_NEITHER),
jobs.statusToString(jobs.STATUS_PAUSED),
jobs.statusToString(jobs.STATUS_PAUSEDSEEDING)})})).append(" AND ")
.append(jobs.windowEndField).append("<? FOR UPDATE");
@@ -5661,8 +5907,6 @@ public class JobManager implements IJobM
{
case Jobs.STATUS_ACTIVE:
case Jobs.STATUS_ACTIVE_UNINSTALLED:
- case Jobs.STATUS_ACTIVE_NOOUTPUT:
- case Jobs.STATUS_ACTIVE_NEITHER:
jobs.waitJob(jobID,Jobs.STATUS_ACTIVEWAITING);
if (Logging.jobs.isDebugEnabled())
{
@@ -5671,8 +5915,6 @@ public class JobManager implements IJobM
break;
case Jobs.STATUS_ACTIVESEEDING:
case Jobs.STATUS_ACTIVESEEDING_UNINSTALLED:
- case Jobs.STATUS_ACTIVESEEDING_NOOUTPUT:
- case Jobs.STATUS_ACTIVESEEDING_NEITHER:
jobs.waitJob(jobID,Jobs.STATUS_ACTIVEWAITINGSEEDING);
if (Logging.jobs.isDebugEnabled())
{
@@ -7157,20 +7399,6 @@ public class JobManager implements IJobM
// Set the state of the job back to "Active"
jobs.writePermanentStatus(jobID,jobs.STATUS_ACTIVE_UNINSTALLED);
break;
- case Jobs.STATUS_ACTIVESEEDING_NOOUTPUT:
- if (Logging.jobs.isDebugEnabled())
- Logging.jobs.debug("Setting job "+jobID+" back to 'Active_NoOutput' state");
-
- // Set the state of the job back to "Active"
- jobs.writePermanentStatus(jobID,jobs.STATUS_ACTIVE_NOOUTPUT);
- break;
- case Jobs.STATUS_ACTIVESEEDING_NEITHER:
- if (Logging.jobs.isDebugEnabled())
- Logging.jobs.debug("Setting job "+jobID+" back to 'Active_Neither' state");
-
- // Set the state of the job back to "Active"
- jobs.writePermanentStatus(jobID,jobs.STATUS_ACTIVE_NEITHER);
- break;
case Jobs.STATUS_ACTIVESEEDING:
if (Logging.jobs.isDebugEnabled())
Logging.jobs.debug("Setting job "+jobID+" back to 'Active' state");
@@ -7221,8 +7449,6 @@ public class JobManager implements IJobM
case Jobs.STATUS_ABORTINGFORRESTARTMINIMAL:
case Jobs.STATUS_ACTIVE:
case Jobs.STATUS_ACTIVE_UNINSTALLED:
- case Jobs.STATUS_ACTIVE_NOOUTPUT:
- case Jobs.STATUS_ACTIVE_NEITHER:
case Jobs.STATUS_PAUSED:
case Jobs.STATUS_ACTIVEWAIT:
case Jobs.STATUS_PAUSEDWAIT:
@@ -7399,9 +7625,7 @@ public class JobManager implements IJobM
new MultiClause(jobs.statusField,new Object[]{
jobs.statusToString(jobs.STATUS_ACTIVE),
jobs.statusToString(jobs.STATUS_ACTIVEWAIT),
- jobs.statusToString(jobs.STATUS_ACTIVE_UNINSTALLED),
- jobs.statusToString(jobs.STATUS_ACTIVE_NOOUTPUT),
- jobs.statusToString(jobs.STATUS_ACTIVE_NEITHER)})}))
+ jobs.statusToString(jobs.STATUS_ACTIVE_UNINSTALLED)})}))
.append(" FOR UPDATE");
IResultSet set = database.performQuery(sb.toString(),list,null,null);
@@ -7930,10 +8154,6 @@ public class JobManager implements IJobM
Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING),
Jobs.statusToString(Jobs.STATUS_ACTIVE_UNINSTALLED),
Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED),
- Jobs.statusToString(Jobs.STATUS_ACTIVE_NOOUTPUT),
- Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NOOUTPUT),
- Jobs.statusToString(Jobs.STATUS_ACTIVE_NEITHER),
- Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NEITHER),
Jobs.statusToString(Jobs.STATUS_PAUSED),
Jobs.statusToString(Jobs.STATUS_PAUSEDSEEDING),
Jobs.statusToString(Jobs.STATUS_ACTIVEWAIT),
@@ -8066,10 +8286,6 @@ public class JobManager implements IJobM
break;
case Jobs.STATUS_ACTIVE_UNINSTALLED:
case Jobs.STATUS_ACTIVESEEDING_UNINSTALLED:
- case Jobs.STATUS_ACTIVE_NOOUTPUT:
- case Jobs.STATUS_ACTIVESEEDING_NOOUTPUT:
- case Jobs.STATUS_ACTIVE_NEITHER:
- case Jobs.STATUS_ACTIVESEEDING_NEITHER:
rstatus = JobStatus.JOBSTATUS_RUNNING_UNINSTALLED;
break;
case Jobs.STATUS_ACTIVE:
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Mon Jun 9 23:19:08 2014
@@ -57,6 +57,7 @@ import java.util.*;
* <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
* <tr><td>failtime</td><td>BIGINT</td><td></td></tr>
* <tr><td>failcount</td><td>BIGINT</td><td></td></tr>
+ * <tr><td>assessmentstate</td><td>CHAR(1)</td><td></td></tr>
* </table>
* <br><br>
*
@@ -124,12 +125,18 @@ public class Jobs extends org.apache.man
// connector exists before deciding what state to put the job into.
public static final int STATUS_ACTIVE_UNINSTALLED = 38; // Active, but repository connector not installed
public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 39; // Active and seeding, but repository connector not installed
- public static final int STATUS_ACTIVE_NOOUTPUT = 40; // Active, but output connector not installed
- public static final int STATUS_ACTIVESEEDING_NOOUTPUT = 41; // Active and seeding, but output connector not installed
- public static final int STATUS_ACTIVE_NEITHER = 42; // Active, but neither repository connector nor output connector installed
- public static final int STATUS_ACTIVESEEDING_NEITHER = 43; // Active and seeding, but neither repository connector nor output connector installed
- public static final int STATUS_DELETING_NOOUTPUT = 44; // Job is being deleted but there's no output connector installed
+ public static final int STATUS_DELETING_NOOUTPUT = 40; // Job is being deleted but there's no output connector installed
+ // Deprecated states. These states should never be used; they're defined only for upgrade purposes
+ public static final int STATUS_ACTIVE_NOOUTPUT = 100; // Active, but output connector not installed
+ public static final int STATUS_ACTIVESEEDING_NOOUTPUT = 101; // Active and seeding, but output connector not installed
+ public static final int STATUS_ACTIVE_NEITHER = 102; // Active, but neither repository connector nor output connector installed
+ public static final int STATUS_ACTIVESEEDING_NEITHER = 103; // Active and seeding, but neither repository connector nor output connector installed
+
+ // Need Connector Assessment states
+ public static final int ASSESSMENT_KNOWN = 0; // State is known.
+ public static final int ASSESSMENT_UNKNOWN = 1; // State is unknown, and job needs assessment
+
// Type field values
public static final int TYPE_CONTINUOUS = IJobDescription.TYPE_CONTINUOUS;
public static final int TYPE_SPECIFIED = IJobDescription.TYPE_SPECIFIED;
@@ -192,14 +199,18 @@ public class Jobs extends org.apache.man
public static final String failTimeField = "failtime";
/** When non-null, indicates the number of retries remaining, after which the attempt will be considered to have actually failed */
public static final String failCountField = "failcount";
-
- protected static Map statusMap;
- protected static Map typeMap;
- protected static Map startMap;
- protected static Map hopmodeMap;
+ /** Set to N when the job needs connector-installed assessment */
+ public static final String assessmentStateField = "assessmentstate";
+
+ protected static Map<String,Integer> statusMap;
+ protected static Map<String,Integer> typeMap;
+ protected static Map<String,Integer> startMap;
+ protected static Map<String,Integer> hopmodeMap;
+ protected static Map<String,Integer> assessmentMap;
+
static
{
- statusMap = new HashMap();
+ statusMap = new HashMap<String,Integer>();
statusMap.put("N",new Integer(STATUS_INACTIVE));
statusMap.put("A",new Integer(STATUS_ACTIVE));
statusMap.put("P",new Integer(STATUS_PAUSED));
@@ -247,19 +258,23 @@ public class Jobs extends org.apache.man
statusMap.put("u",new Integer(STATUS_ACTIVESEEDING_NEITHER));
statusMap.put("D",new Integer(STATUS_DELETING_NOOUTPUT));
- typeMap = new HashMap();
+ typeMap = new HashMap<String,Integer>();
typeMap.put("C",new Integer(TYPE_CONTINUOUS));
typeMap.put("S",new Integer(TYPE_SPECIFIED));
- startMap = new HashMap();
+ startMap = new HashMap<String,Integer>();
startMap.put("B",new Integer(START_WINDOWBEGIN));
startMap.put("I",new Integer(START_WINDOWINSIDE));
startMap.put("D",new Integer(START_DISABLE));
- hopmodeMap = new HashMap();
+ hopmodeMap = new HashMap<String,Integer>();
hopmodeMap.put("A",new Integer(HOPCOUNT_ACCURATE));
hopmodeMap.put("N",new Integer(HOPCOUNT_NODELETE));
hopmodeMap.put("V",new Integer(HOPCOUNT_NEVERDELETE));
+
+ assessmentMap = new HashMap<String,Integer>();
+ assessmentMap.put("Y",new Integer(ASSESSMENT_KNOWN));
+ assessmentMap.put("N",new Integer(ASSESSMENT_UNKNOWN));
}
/* Transient vs. non-transient states
@@ -321,15 +336,17 @@ public class Jobs extends org.apache.man
*/
// Local variables
- protected ICacheManager cacheManager;
- protected ScheduleManager scheduleManager;
- protected HopFilterManager hopFilterManager;
- protected ForcedParamManager forcedParamManager;
-
- protected IOutputConnectionManager outputMgr;
- protected IRepositoryConnectionManager connectionMgr;
-
- protected IThreadContext threadContext;
+ protected final ICacheManager cacheManager;
+ protected final ScheduleManager scheduleManager;
+ protected final HopFilterManager hopFilterManager;
+ protected final ForcedParamManager forcedParamManager;
+ protected final PipelineManager pipelineManager;
+
+ protected final IOutputConnectionManager outputMgr;
+ protected final IRepositoryConnectionManager connectionMgr;
+ protected final ITransformationConnectionManager transMgr;
+
+ protected final IThreadContext threadContext;
/** Constructor.
*@param database is the database handle.
@@ -342,16 +359,20 @@ public class Jobs extends org.apache.man
scheduleManager = new ScheduleManager(threadContext,database);
hopFilterManager = new HopFilterManager(threadContext,database);
forcedParamManager = new ForcedParamManager(threadContext,database);
+ pipelineManager = new PipelineManager(threadContext,database);
cacheManager = CacheManagerFactory.make(threadContext);
outputMgr = OutputConnectionManagerFactory.make(threadContext);
connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+ transMgr = TransformationConnectionManagerFactory.make(threadContext);
}
/** Install or upgrade this table.
*/
- public void install(String outputTableName, String outputNameField, String connectionTableName, String connectionNameField)
+ public void install(String transTableName, String transNameField,
+ String outputTableName, String outputNameField,
+ String connectionTableName, String connectionNameField)
throws ManifoldCFException
{
// Standard practice: Have a loop around everything, in case upgrade needs it.
@@ -386,6 +407,7 @@ public class Jobs extends org.apache.man
map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
map.put(failTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
map.put(failCountField,new ColumnDescription("BIGINT",false,true,null,null,false));
+ map.put(assessmentStateField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
performCreate(map,null);
}
else
@@ -415,9 +437,33 @@ public class Jobs extends org.apache.man
insertMap.put(failCountField,new ColumnDescription("BIGINT",false,true,null,null,false));
performAlter(insertMap,null,null,null);
}
+ if (existing.get(assessmentStateField) == null)
+ {
+ Map insertMap = new HashMap();
+ insertMap.put(assessmentStateField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
+ performAlter(insertMap,null,null,null);
+ String query;
+ ArrayList list = new ArrayList();
+ HashMap map = new HashMap();
+ query = buildConjunctionClause(list,new ClauseDescription[]{
+ new MultiClause(statusField,new Object[]{
+ statusToString(STATUS_ACTIVE_NOOUTPUT),
+ statusToString(STATUS_ACTIVE_NEITHER)})});
+ map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
+ performUpdate(map,"WHERE "+query,list,null);
+ list.clear();
+ map.clear();
+ query = buildConjunctionClause(list,new ClauseDescription[]{
+ new MultiClause(statusField,new Object[]{
+ statusToString(STATUS_ACTIVESEEDING_NOOUTPUT),
+ statusToString(STATUS_ACTIVESEEDING_NEITHER)})});
+ map.put(statusField,statusToString(STATUS_ACTIVESEEDING_UNINSTALLED));
+ performUpdate(map,"WHERE "+query,list,null);
+ }
}
// Handle related tables
+ pipelineManager.install(getTableName(),idField,transTableName,transNameField);
scheduleManager.install(getTableName(),idField);
hopFilterManager.install(getTableName(),idField);
forcedParamManager.install(getTableName(),idField);
@@ -480,6 +526,7 @@ public class Jobs extends org.apache.man
forcedParamManager.deinstall();
hopFilterManager.deinstall();
scheduleManager.deinstall();
+ pipelineManager.deinstall();
performDrop(null);
}
catch (ManifoldCFException e)
@@ -516,6 +563,27 @@ public class Jobs extends org.apache.man
analyzeTable();
}
+ /** Find a list of jobs matching specified transformation names.
+ */
+ public Long[] findJobsMatchingTransformations(List<String> transformationConnectionNames)
+ throws ManifoldCFException
+ {
+ StringBuilder query = new StringBuilder();
+ ArrayList params = new ArrayList();
+ query.append("SELECT ").append(idField)
+ .append(" FROM ").append(getTableName()).append(" t1 WHERE EXISTS(");
+ pipelineManager.buildQueryClause(query,params,"t1."+idField,transformationConnectionNames);
+ query.append(")");
+ IResultSet set = performQuery(query.toString(),params,null,null);
+ Long[] rval = new Long[set.getRowCount()];
+ for (int i = 0; i < rval.length; i++)
+ {
+ IResultRow row = set.getRow(i);
+ rval[i] = (Long)row.getValue(idField);
+ }
+ return rval;
+ }
+
/** Read schedule records for a specified set of jobs. Cannot use caching!
*/
public ScheduleRecord[][] readScheduleRecords(Long[] jobIDs)
@@ -837,12 +905,19 @@ public class Jobs extends org.apache.man
IResultRow row = set.getRow(0);
- boolean isSame = true;
-
// Determine whether we need to reset the scan time for documents.
// Basically, any change to job parameters that could affect ingestion should clear isSame so that we
// relook at all the documents, not just the recent ones.
+ boolean isSame = pipelineManager.compareRows(id,jobDescription);
+ if (!isSame)
+ {
+ int currentStatus = stringToStatus((String)row.getValue(statusField));
+ if (currentStatus == STATUS_ACTIVE || currentStatus == STATUS_ACTIVESEEDING ||
+ currentStatus == STATUS_ACTIVE_UNINSTALLED || currentStatus == STATUS_ACTIVESEEDING_UNINSTALLED)
+ values.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
+ }
+
if (isSame)
{
String oldOutputSpecXML = (String)row.getValue(outputSpecField);
@@ -858,32 +933,11 @@ public class Jobs extends org.apache.man
}
if (isSame)
- {
- // Compare hopcount filter criteria.
- Map filterRows = hopFilterManager.readRows(id);
- Map newFilterRows = jobDescription.getHopCountFilters();
- if (filterRows.size() != newFilterRows.size())
- isSame = false;
- else
- {
- for (String linkType : (Collection<String>)filterRows.keySet())
- {
- Long oldCount = (Long)filterRows.get(linkType);
- Long newCount = (Long)newFilterRows.get(linkType);
- if (oldCount == null || newCount == null)
- {
- isSame = false;
- break;
- }
- else if (oldCount.longValue() != newCount.longValue())
- {
- isSame = false;
- break;
- }
- }
- }
- }
-
+ isSame = hopFilterManager.compareRows(id,jobDescription);
+
+ if (isSame)
+ isSame = forcedParamManager.compareRows(id,jobDescription);
+
if (!isSame)
values.put(lastCheckTimeField,null);
@@ -891,6 +945,7 @@ public class Jobs extends org.apache.man
query = buildConjunctionClause(params,new ClauseDescription[]{
new UnitaryClause(idField,id)});
performUpdate(values," WHERE "+query,params,null);
+ pipelineManager.deleteRows(id);
scheduleManager.deleteRows(id);
hopFilterManager.deleteRows(id);
forcedParamManager.deleteRows(id);
@@ -907,6 +962,8 @@ public class Jobs extends org.apache.man
performInsert(values,null);
}
+ // Write pipeline rows
+ pipelineManager.writeRows(id,jobDescription);
// Write schedule records
scheduleManager.writeRows(id,jobDescription);
// Write hop filter rows
@@ -1112,19 +1169,6 @@ public class Jobs extends org.apache.man
map.put(processIDField,null);
performUpdate(map,"WHERE "+query,list,invKey);
list.clear();
- query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT)),
- new UnitaryClause(processIDField,processID)});
- map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
- map.put(processIDField,null);
- performUpdate(map,"WHERE "+query,list,invKey);
- list.clear();
- query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER)),
- new UnitaryClause(processIDField,processID)});
- map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
- map.put(processIDField,null);
- performUpdate(map,"WHERE "+query,list,invKey);
}
@@ -1256,18 +1300,6 @@ public class Jobs extends org.apache.man
map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
map.put(processIDField,null);
performUpdate(map,"WHERE "+query,list,invKey);
- list.clear();
- query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT))});
- map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
- map.put(processIDField,null);
- performUpdate(map,"WHERE "+query,list,invKey);
- list.clear();
- query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER))});
- map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
- map.put(processIDField,null);
- performUpdate(map,"WHERE "+query,list,invKey);
}
@@ -1282,11 +1314,29 @@ public class Jobs extends org.apache.man
performUpdate(map,"WHERE "+query,list,null);
}
- /** Signal to a job that its underlying output connector has gone away.
+ /** Invalidate current state with respect to installed connectors, as a result of registration.
+ */
+ public void invalidateCurrentUnregisteredState(Long jobID, int oldStatusValue)
+ throws ManifoldCFException
+ {
+ // If we are in a state that cares about the connector state, then we have to signal we need an assessment.
+ if (oldStatusValue == STATUS_ACTIVE_UNINSTALLED || oldStatusValue == STATUS_ACTIVESEEDING_UNINSTALLED || oldStatusValue == STATUS_DELETING_NOOUTPUT)
+ {
+ // Assessment state is not cached, so no cache invalidation needed
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(idField,jobID)});
+ HashMap map = new HashMap();
+ map.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
+ performUpdate(map,"WHERE "+query,list,null);
+ }
+ }
+
+ /** Signal to a job that an underlying transformation connector has gone away.
*@param jobID is the identifier of the job.
*@param oldStatusValue is the current status value for the job.
*/
- public void noteOutputConnectorDeregistration(Long jobID, int oldStatusValue)
+ public void noteTransformationConnectorDeregistration(Long jobID, int oldStatusValue)
throws ManifoldCFException
{
int newStatusValue;
@@ -1299,19 +1349,10 @@ public class Jobs extends org.apache.man
switch (oldStatusValue)
{
case STATUS_ACTIVE:
- newStatusValue = STATUS_ACTIVE_NOOUTPUT;
+ newStatusValue = STATUS_ACTIVE_UNINSTALLED;
break;
case STATUS_ACTIVESEEDING:
- newStatusValue = STATUS_ACTIVESEEDING_NOOUTPUT;
- break;
- case STATUS_ACTIVE_UNINSTALLED:
- newStatusValue = STATUS_ACTIVE_NEITHER;
- break;
- case STATUS_ACTIVESEEDING_UNINSTALLED:
- newStatusValue = STATUS_ACTIVESEEDING_NEITHER;
- break;
- case STATUS_DELETING:
- newStatusValue = STATUS_DELETING_NOOUTPUT;
+ newStatusValue = STATUS_ACTIVESEEDING_UNINSTALLED;
break;
default:
newStatusValue = oldStatusValue;
@@ -1324,38 +1365,47 @@ public class Jobs extends org.apache.man
HashMap newValues = new HashMap();
newValues.put(statusField,statusToString(newStatusValue));
+ newValues.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,jobID)});
performUpdate(newValues,"WHERE "+query,list,invKey);
}
- /** Signal to a job that its underlying output connector has returned.
+ /** Signal to a job that an underlying transformation connector has been registered.
*@param jobID is the identifier of the job.
*@param oldStatusValue is the current status value for the job.
*/
- public void noteOutputConnectorRegistration(Long jobID, int oldStatusValue)
+ public void noteTransformationConnectorRegistration(Long jobID, int oldStatusValue)
+ throws ManifoldCFException
+ {
+ invalidateCurrentUnregisteredState(jobID,oldStatusValue);
+ }
+
+ /** Signal to a job that its underlying output connector has gone away.
+ *@param jobID is the identifier of the job.
+ *@param oldStatusValue is the current status value for the job.
+ */
+ public void noteOutputConnectorDeregistration(Long jobID, int oldStatusValue)
throws ManifoldCFException
{
int newStatusValue;
- // The following states are special, in that when the underlying connector returns, the jobs
- // in such states are switched back to their connector-installed value.
+ // The following states are special, in that when the underlying connector goes away, the jobs
+ // in such states are switched away to something else. There are TWO reasons that a state may be in
+ // this category: EITHER we don't want the job in this state to be treated in the same way if its
+ // connector is uninstalled, OR we need feedback for the user interface. If it's the latter situation,
+ // then all usages of the corresponding states will be identical - and that's in fact precisely where we
+ // start with in all the code.
switch (oldStatusValue)
{
- case STATUS_ACTIVE_NOOUTPUT:
- newStatusValue = STATUS_ACTIVE;
- break;
- case STATUS_ACTIVESEEDING_NOOUTPUT:
- newStatusValue = STATUS_ACTIVESEEDING;
- break;
- case STATUS_ACTIVE_NEITHER:
+ case STATUS_ACTIVE:
newStatusValue = STATUS_ACTIVE_UNINSTALLED;
break;
- case STATUS_ACTIVESEEDING_NEITHER:
+ case STATUS_ACTIVESEEDING:
newStatusValue = STATUS_ACTIVESEEDING_UNINSTALLED;
break;
- case STATUS_DELETING_NOOUTPUT:
- newStatusValue = STATUS_DELETING;
+ case STATUS_DELETING:
+ newStatusValue = STATUS_DELETING_NOOUTPUT;
break;
default:
newStatusValue = oldStatusValue;
@@ -1368,12 +1418,38 @@ public class Jobs extends org.apache.man
HashMap newValues = new HashMap();
newValues.put(statusField,statusToString(newStatusValue));
+ newValues.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,jobID)});
performUpdate(newValues,"WHERE "+query,list,invKey);
}
+
+ /** Signal to a job that its underlying output connector has returned.
+ *@param jobID is the identifier of the job.
+ *@param oldStatusValue is the current status value for the job.
+ */
+ public void noteOutputConnectorRegistration(Long jobID, int oldStatusValue)
+ throws ManifoldCFException
+ {
+ if (oldStatusValue == STATUS_DELETING_NOOUTPUT)
+ {
+ // We can do the state transition now.
+ StringSet invKey = new StringSet(getJobStatusKey());
+ HashMap newValues = new HashMap();
+ newValues.put(statusField,statusToString(STATUS_DELETING));
+ newValues.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(idField,jobID)});
+ performUpdate(newValues,"WHERE "+query,list,invKey);
+ return;
+ }
+ // Otherwise, we don't know the state, and can't do the work now because we'd deadlock
+ invalidateCurrentUnregisteredState(jobID,oldStatusValue);
+ }
+
/** Signal to a job that its underlying connector has gone away.
*@param jobID is the identifier of the job.
*@param oldStatusValue is the current status value for the job.
@@ -1396,12 +1472,6 @@ public class Jobs extends org.apache.man
case STATUS_ACTIVESEEDING:
newStatusValue = STATUS_ACTIVESEEDING_UNINSTALLED;
break;
- case STATUS_ACTIVE_NOOUTPUT:
- newStatusValue = STATUS_ACTIVE_NEITHER;
- break;
- case STATUS_ACTIVESEEDING_NOOUTPUT:
- newStatusValue = STATUS_ACTIVESEEDING_NEITHER;
- break;
default:
newStatusValue = oldStatusValue;
break;
@@ -1413,12 +1483,13 @@ public class Jobs extends org.apache.man
HashMap newValues = new HashMap();
newValues.put(statusField,statusToString(newStatusValue));
+ newValues.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,jobID)});
performUpdate(newValues,"WHERE "+query,list,invKey);
}
-
+
/** Signal to a job that its underlying connector has returned.
*@param jobID is the identifier of the job.
*@param oldStatusValue is the current status value for the job.
@@ -1426,40 +1497,9 @@ public class Jobs extends org.apache.man
public void noteConnectorRegistration(Long jobID, int oldStatusValue)
throws ManifoldCFException
{
- int newStatusValue;
- // The following states are special, in that when the underlying connector returns, the jobs
- // in such states are switched back to their connector-installed value.
- switch (oldStatusValue)
- {
- case STATUS_ACTIVE_UNINSTALLED:
- newStatusValue = STATUS_ACTIVE;
- break;
- case STATUS_ACTIVESEEDING_UNINSTALLED:
- newStatusValue = STATUS_ACTIVESEEDING;
- break;
- case STATUS_ACTIVE_NEITHER:
- newStatusValue = STATUS_ACTIVE_NOOUTPUT;
- break;
- case STATUS_ACTIVESEEDING_NEITHER:
- newStatusValue = STATUS_ACTIVESEEDING_NOOUTPUT;
- break;
- default:
- newStatusValue = oldStatusValue;
- break;
- }
- if (newStatusValue == oldStatusValue)
- return;
-
- StringSet invKey = new StringSet(getJobStatusKey());
-
- HashMap newValues = new HashMap();
- newValues.put(statusField,statusToString(newStatusValue));
- ArrayList list = new ArrayList();
- String query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(idField,jobID)});
- performUpdate(newValues,"WHERE "+query,list,invKey);
+ invalidateCurrentUnregisteredState(jobID,oldStatusValue);
}
-
+
/** Note a change in connection configuration.
* This method will be called whenever a connection's configuration is modified, or when an external repository change
* is signalled.
@@ -1491,6 +1531,22 @@ public class Jobs extends org.apache.man
new UnitaryClause(outputNameField,connectionName)});
performUpdate(newValues,"WHERE "+query,list,null);
}
+
+ /** Note a change in transformation connection configuration.
+ * This method will be called whenever a connection's configuration is modified.
+ */
+ public void noteTransformationConnectionChange(String connectionName)
+ throws ManifoldCFException
+ {
+ // No cache keys need invalidation, since we're changing the start time, not the status.
+ HashMap newValues = new HashMap();
+ newValues.put(lastCheckTimeField,null);
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new JoinClause(getTableName()+"."+idField,pipelineManager.ownerIDField),
+ new UnitaryClause(pipelineManager.transformationNameField,connectionName)});
+ performUpdate(newValues,"WHERE EXISTS(SELECT 'x' FROM "+pipelineManager.getTableName()+" WHERE "+query+")",list,null);
+ }
/** Check whether a job's status indicates that it is in ACTIVE or ACTIVESEEDING state.
*/
@@ -1683,20 +1739,6 @@ public class Jobs extends org.apache.man
map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
map.put(processIDField,null);
performUpdate(map,"WHERE "+query,list,invKey);
- list.clear();
- query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT)),
- new UnitaryClause(processIDField,processID)});
- map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
- map.put(processIDField,null);
- performUpdate(map,"WHERE "+query,list,invKey);
- list.clear();
- query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER)),
- new UnitaryClause(processIDField,processID)});
- map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
- map.put(processIDField,null);
- performUpdate(map,"WHERE "+query,list,invKey);
}
@@ -1769,12 +1811,6 @@ public class Jobs extends org.apache.man
case STATUS_ACTIVESEEDING_UNINSTALLED:
newStatus = STATUS_ACTIVE_UNINSTALLED;
break;
- case STATUS_ACTIVESEEDING_NOOUTPUT:
- newStatus = STATUS_ACTIVE_NOOUTPUT;
- break;
- case STATUS_ACTIVESEEDING_NEITHER:
- newStatus = STATUS_ACTIVE_NEITHER;
- break;
case STATUS_ACTIVEWAITSEEDING:
newStatus = STATUS_ACTIVEWAIT;
break;
@@ -1873,6 +1909,67 @@ public class Jobs extends org.apache.man
performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
}
+ /** Assess all marked jobs to determine if they can be reactivated.
+ */
+ public void assessMarkedJobs()
+ throws ManifoldCFException
+ {
+ ArrayList newList = new ArrayList();
+ String query = buildConjunctionClause(newList,new ClauseDescription[]{
+ new UnitaryClause(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN))});
+ // Query for the matching jobs, and then for each job potentially adjust the state based on the connector status
+ IResultSet set = performQuery("SELECT "+idField+","+statusField+","+connectionNameField+","+outputNameField+" FROM "+
+ getTableName()+" WHERE "+query+" FOR UPDATE",
+ newList,null,null);
+ for (int i = 0; i < set.getRowCount(); i++)
+ {
+ IResultRow row = set.getRow(i);
+ Long jobID = (Long)row.getValue(idField);
+ String outputName = (String)row.getValue(outputNameField);
+ String connectionName = (String)row.getValue(connectionNameField);
+ String[] transformationNames = pipelineManager.getConnectionNames(jobID);
+ int statusValue = stringToStatus((String)row.getValue(statusField));
+ int newValue;
+
+ // Based on status value, see what we need to know to determine if the state can be switched
+ switch (statusValue)
+ {
+ case STATUS_DELETING_NOOUTPUT:
+ if (outputMgr.checkConnectorExists(outputName))
+ newValue = STATUS_DELETING;
+ else
+ return;
+ break;
+ case STATUS_ACTIVE_UNINSTALLED:
+ if (outputMgr.checkConnectorExists(outputName) &&
+ connectionMgr.checkConnectorExists(connectionName) &&
+ checkTransformationsInstalled(transformationNames))
+ newValue = STATUS_ACTIVE;
+ else
+ return;
+ break;
+ case STATUS_ACTIVESEEDING_UNINSTALLED:
+ if (outputMgr.checkConnectorExists(outputName) &&
+ connectionMgr.checkConnectorExists(connectionName) &&
+ checkTransformationsInstalled(transformationNames))
+ newValue = STATUS_ACTIVESEEDING;
+ else
+ return;
+ break;
+ default:
+ return;
+ }
+
+ ArrayList list = new ArrayList();
+ query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(idField,jobID)});
+ HashMap map = new HashMap();
+ map.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
+ performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+ }
+
+ }
+
/** Put job back into active state, from the shutting-down state.
*@param jobID is the job identifier.
*/
@@ -1889,26 +1986,24 @@ public class Jobs extends org.apache.man
query+" FOR UPDATE",list,null,null);
if (set.getRowCount() == 0)
throw new ManifoldCFException("Can't find job "+jobID.toString());
+
IResultRow row = set.getRow(0);
+
int status = stringToStatus((String)row.getValue(statusField));
int newStatus;
switch (status)
{
case STATUS_SHUTTINGDOWN:
- if (connectionMgr.checkConnectorExists((String)row.getValue(connectionNameField)))
- {
- if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
- newStatus = STATUS_ACTIVE;
- else
- newStatus = STATUS_ACTIVE_NOOUTPUT;
- }
+ String[] transformationConnectionNames = pipelineManager.getConnectionNames(jobID);
+ String connectionName = (String)row.getValue(connectionNameField);
+ String outputName = (String)row.getValue(outputNameField);
+ // Want either STATUS_ACTIVE, or STATUS_ACTIVE_UNINSTALLED
+ if (!checkTransformationsInstalled(transformationConnectionNames) ||
+ !connectionMgr.checkConnectorExists(connectionName) ||
+ !outputMgr.checkConnectorExists(outputName))
+ newStatus = STATUS_ACTIVE_UNINSTALLED;
else
- {
- if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
- newStatus = STATUS_ACTIVE_UNINSTALLED;
- else
- newStatus = STATUS_ACTIVE_NEITHER;
- }
+ newStatus = STATUS_ACTIVE;
break;
default:
// Complain!
@@ -1940,6 +2035,17 @@ public class Jobs extends org.apache.man
endTransaction();
}
}
+
+ protected boolean checkTransformationsInstalled(String[] transformationNames)
+ throws ManifoldCFException
+ {
+ for (String transformationName : transformationNames)
+ {
+ if (!transMgr.checkConnectorExists(transformationName))
+ return false;
+ }
+ return true;
+ }
/** Put job into "deleting" state, and set the start time field.
*@param jobID is the job identifier.
@@ -2022,26 +2128,24 @@ public class Jobs extends org.apache.man
if (set.getRowCount() == 0)
throw new ManifoldCFException("Can't find job "+jobID.toString());
IResultRow row = set.getRow(0);
+
int status = stringToStatus((String)row.getValue(statusField));
int newStatus;
switch (status)
{
case STATUS_STARTINGUP:
case STATUS_STARTINGUPMINIMAL:
- if (connectionMgr.checkConnectorExists((String)row.getValue(connectionNameField)))
- {
- if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
- newStatus = STATUS_ACTIVE;
- else
- newStatus = STATUS_ACTIVE_NOOUTPUT;
- }
+ String[] transformationConnectionNames = pipelineManager.getConnectionNames(jobID);
+ String connectionName = (String)row.getValue(connectionNameField);
+ String outputName = (String)row.getValue(outputNameField);
+
+ // Need to set either STATUS_ACTIVE, or STATUS_ACTIVE_UNINSTALLED
+ if (!checkTransformationsInstalled(transformationConnectionNames) ||
+ !connectionMgr.checkConnectorExists(connectionName) ||
+ !outputMgr.checkConnectorExists(outputName))
+ newStatus = STATUS_ACTIVE_UNINSTALLED;
else
- {
- if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
- newStatus = STATUS_ACTIVE_UNINSTALLED;
- else
- newStatus = STATUS_ACTIVE_NEITHER;
- }
+ newStatus = STATUS_ACTIVE;
break;
case STATUS_ABORTINGSTARTINGUPFORRESTART:
newStatus = STATUS_ABORTINGFORRESTART;
@@ -2056,8 +2160,7 @@ public class Jobs extends org.apache.man
HashMap map = new HashMap();
map.put(statusField,statusToString(newStatus));
- if (newStatus == STATUS_ACTIVE || newStatus == STATUS_ACTIVE_UNINSTALLED ||
- newStatus == STATUS_ACTIVE_NOOUTPUT || newStatus == STATUS_ACTIVE_NEITHER)
+ if (newStatus == STATUS_ACTIVE || newStatus == STATUS_ACTIVE_UNINSTALLED)
{
map.put(startTimeField,new Long(startTime));
}
@@ -2128,12 +2231,6 @@ public class Jobs extends org.apache.man
case STATUS_ACTIVESEEDING_UNINSTALLED:
newStatus = STATUS_ACTIVE_UNINSTALLED;
break;
- case STATUS_ACTIVESEEDING_NOOUTPUT:
- newStatus = STATUS_ACTIVE_NOOUTPUT;
- break;
- case STATUS_ACTIVESEEDING_NEITHER:
- newStatus = STATUS_ACTIVE_NEITHER;
- break;
case STATUS_ACTIVEWAITSEEDING:
newStatus = STATUS_ACTIVEWAIT;
break;
@@ -2212,6 +2309,7 @@ public class Jobs extends org.apache.man
new UnitaryClause(idField,jobID)});
HashMap map = new HashMap();
map.put(statusField,statusToString(newStatus));
+ map.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
map.put(windowEndField,null);
performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
}
@@ -2258,8 +2356,6 @@ public class Jobs extends org.apache.man
case STATUS_READYFORSTARTUPMINIMAL:
case STATUS_ACTIVE:
case STATUS_ACTIVE_UNINSTALLED:
- case STATUS_ACTIVE_NOOUTPUT:
- case STATUS_ACTIVE_NEITHER:
case STATUS_ACTIVEWAIT:
case STATUS_PAUSING:
case STATUS_ACTIVEWAITING:
@@ -2272,8 +2368,6 @@ public class Jobs extends org.apache.man
break;
case STATUS_ACTIVESEEDING:
case STATUS_ACTIVESEEDING_UNINSTALLED:
- case STATUS_ACTIVESEEDING_NOOUTPUT:
- case STATUS_ACTIVESEEDING_NEITHER:
case STATUS_ACTIVEWAITSEEDING:
case STATUS_PAUSINGSEEDING:
case STATUS_ACTIVEWAITINGSEEDING:
@@ -2331,8 +2425,6 @@ public class Jobs extends org.apache.man
case STATUS_READYFORSTARTUPMINIMAL:
case STATUS_ACTIVE:
case STATUS_ACTIVE_UNINSTALLED:
- case STATUS_ACTIVE_NOOUTPUT:
- case STATUS_ACTIVE_NEITHER:
case STATUS_ACTIVEWAIT:
case STATUS_PAUSING:
case STATUS_ACTIVEWAITING:
@@ -2343,8 +2435,6 @@ public class Jobs extends org.apache.man
break;
case STATUS_ACTIVESEEDING:
case STATUS_ACTIVESEEDING_UNINSTALLED:
- case STATUS_ACTIVESEEDING_NOOUTPUT:
- case STATUS_ACTIVESEEDING_NEITHER:
case STATUS_ACTIVEWAITSEEDING:
case STATUS_PAUSINGSEEDING:
case STATUS_ACTIVEWAITINGSEEDING:
@@ -2383,8 +2473,6 @@ public class Jobs extends org.apache.man
{
case STATUS_ACTIVE:
case STATUS_ACTIVE_UNINSTALLED:
- case STATUS_ACTIVE_NOOUTPUT:
- case STATUS_ACTIVE_NEITHER:
newStatus = STATUS_PAUSING;
break;
case STATUS_ACTIVEWAITING:
@@ -2395,8 +2483,6 @@ public class Jobs extends org.apache.man
break;
case STATUS_ACTIVESEEDING:
case STATUS_ACTIVESEEDING_UNINSTALLED:
- case STATUS_ACTIVESEEDING_NOOUTPUT:
- case STATUS_ACTIVESEEDING_NEITHER:
newStatus = STATUS_PAUSINGSEEDING;
break;
case STATUS_ACTIVEWAITINGSEEDING:
@@ -2618,6 +2704,7 @@ public class Jobs extends org.apache.man
throw new ManifoldCFException("Job does not exist: "+jobID);
IResultRow row = set.getRow(0);
int status = stringToStatus(row.getValue(statusField).toString());
+ String[] transformationConnectionNames = pipelineManager.getConnectionNames(jobID);
String connectionName = (String)row.getValue(connectionNameField);
String outputName = (String)row.getValue(outputNameField);
int newStatus;
@@ -2625,37 +2712,23 @@ public class Jobs extends org.apache.man
switch (status)
{
case STATUS_RESUMING:
- if (connectionMgr.checkConnectorExists(connectionName))
- {
- if (outputMgr.checkConnectorExists(outputName))
- newStatus = STATUS_ACTIVE;
- else
- newStatus = STATUS_ACTIVE_NOOUTPUT;
- }
+ // Want either STATUS_ACTIVE or STATUS_ACTIVE_UNINSTALLED
+ if (!checkTransformationsInstalled(transformationConnectionNames) ||
+ !connectionMgr.checkConnectorExists(connectionName) ||
+ !outputMgr.checkConnectorExists(outputName))
+ newStatus = STATUS_ACTIVE_UNINSTALLED;
else
- {
- if (outputMgr.checkConnectorExists(outputName))
- newStatus = STATUS_ACTIVE_UNINSTALLED;
- else
- newStatus = STATUS_ACTIVE_NEITHER;
- }
+ newStatus = STATUS_ACTIVE;
map.put(statusField,statusToString(newStatus));
break;
case STATUS_RESUMINGSEEDING:
- if (connectionMgr.checkConnectorExists(connectionName))
- {
- if (outputMgr.checkConnectorExists(outputName))
- newStatus = STATUS_ACTIVESEEDING;
- else
- newStatus = STATUS_ACTIVESEEDING_NOOUTPUT;
- }
+ // Want either STATUS_ACTIVESEEDING or STATUS_ACTIVESEEDING_UNINSTALLED
+ if (!checkTransformationsInstalled(transformationConnectionNames) ||
+ !connectionMgr.checkConnectorExists(connectionName) ||
+ !outputMgr.checkConnectorExists(outputName))
+ newStatus = STATUS_ACTIVESEEDING_UNINSTALLED;
else
- {
- if (outputMgr.checkConnectorExists(outputName))
- newStatus = STATUS_ACTIVESEEDING_UNINSTALLED;
- else
- newStatus = STATUS_ACTIVESEEDING_NEITHER;
- }
+ newStatus = STATUS_ACTIVESEEDING;
map.put(statusField,statusToString(newStatus));
break;
default:
@@ -2810,6 +2883,21 @@ public class Jobs extends org.apache.man
return set.getRowCount() > 0;
}
+ /** See if there's a reference to a transformation connection name.
+ *@param connectionName is the name of the connection.
+ *@return true if there is a reference, false otherwise.
+ */
+ public boolean checkIfTransformationReference(String connectionName)
+ throws ManifoldCFException
+ {
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(pipelineManager.transformationNameField,connectionName)});
+ IResultSet set = performQuery("SELECT "+pipelineManager.ownerIDField+" FROM "+pipelineManager.getTableName()+
+ " WHERE "+query,list,new StringSet(getJobsKey()),null);
+ return set.getRowCount() > 0;
+ }
+
/** Get the job IDs associated with a given connection name.
*@param connectionName is the name of the connection.
*@return the set of job id's associated with that connection.
@@ -3063,6 +3151,35 @@ public class Jobs extends org.apache.man
}
}
+ /** Go from string to assessment state.
+ */
+ public static int stringToAssessmentState(String value)
+ throws ManifoldCFException
+ {
+ if (value == null || value.length() == 0)
+ return ASSESSMENT_KNOWN;
+ Integer x = assessmentMap.get(value);
+ if (x == null)
+ throw new ManifoldCFException("Bad assessment value: '"+value+"'");
+ return x.intValue();
+ }
+
+ /** Go from assessment state to string.
+ */
+ public static String assessmentStateToString(int value)
+ throws ManifoldCFException
+ {
+ switch(value)
+ {
+ case ASSESSMENT_KNOWN:
+ return "Y";
+ case ASSESSMENT_UNKNOWN:
+ return "N";
+ default:
+ throw new ManifoldCFException("Unknown assessment state value "+Integer.toString(value));
+ }
+ }
+
/** Go from string to hopcount mode.
*/
public static int stringToHopcountMode(String value)
@@ -3070,7 +3187,7 @@ public class Jobs extends org.apache.man
{
if (value == null || value.length() == 0)
return HOPCOUNT_ACCURATE;
- Integer x = (Integer)hopmodeMap.get(value);
+ Integer x = hopmodeMap.get(value);
if (x == null)
throw new ManifoldCFException("Bad hopcount mode value: '"+value+"'");
return x.intValue();
@@ -3214,14 +3331,12 @@ public class Jobs extends org.apache.man
throws ManifoldCFException
{
// Fetch all the jobs, but only once for each ID. Then, assign each one by id into the final array.
- HashMap uniqueIDs = new HashMap();
- int i = 0;
- while (i < ids.length)
+ Set<Long> uniqueIDs = new HashSet<Long>();
+ for (Long id : ids)
{
- uniqueIDs.put(ids[i],ids[i]);
- i++;
+ uniqueIDs.add(id);
}
- HashMap returnValues = new HashMap();
+ Map<Long,JobDescription> returnValues = new HashMap<Long,JobDescription>();
beginTransaction();
try
{
@@ -3229,8 +3344,7 @@ public class Jobs extends org.apache.man
ArrayList params = new ArrayList();
int j = 0;
int maxIn = getMaxInClause();
- Iterator iter = uniqueIDs.keySet().iterator();
- while (iter.hasNext())
+ for (Long uniqueID : uniqueIDs)
{
if (j == maxIn)
{
@@ -3242,7 +3356,7 @@ public class Jobs extends org.apache.man
if (j > 0)
sb.append(',');
sb.append('?');
- params.add((Long)iter.next());
+ params.add(uniqueID);
j++;
}
if (j > 0)
@@ -3265,15 +3379,13 @@ public class Jobs extends org.apache.man
// Build the return array
JobDescription[] rval = new JobDescription[ids.length];
- i = 0;
- while (i < rval.length)
+ for (int i = 0; i < rval.length; i++)
{
Long id = ids[i];
- JobDescription jd = (JobDescription)returnValues.get(id);
+ JobDescription jd = returnValues.get(id);
if (jd != null)
jd.makeReadOnly();
rval[i] = jd;
- i++;
}
return rval;
}
@@ -3283,7 +3395,7 @@ public class Jobs extends org.apache.man
*@param idList is the list of id's.
*@param params is the set of parameters.
*/
- protected void getJobsChunk(Map returnValues, String idList, ArrayList params)
+ protected void getJobsChunk(Map<Long,JobDescription> returnValues, String idList, ArrayList params)
throws ManifoldCFException
{
try
@@ -3321,6 +3433,7 @@ public class Jobs extends org.apache.man
}
// Fill in schedules for jobs
+ pipelineManager.getRows(returnValues,idList,params);
scheduleManager.getRows(returnValues,idList,params);
hopFilterManager.getRows(returnValues,idList,params);
forcedParamManager.getRows(returnValues,idList,params);
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ScheduleManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ScheduleManager.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ScheduleManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ScheduleManager.java Mon Jun 9 23:19:08 2014
@@ -164,7 +164,7 @@ public class ScheduleManager extends org
*@param ownerIDList is the list of owner id's.
*@param ownerIDParams is the corresponding set of owner id parameters.
*/
- public void getRows(Map returnValues, String ownerIDList, ArrayList ownerIDParams)
+ public void getRows(Map<Long,JobDescription> returnValues, String ownerIDList, ArrayList ownerIDParams)
throws ManifoldCFException
{
IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+ownerIDField+" IN ("+ownerIDList+") ORDER BY "+ordinalField+" ASC",ownerIDParams,
@@ -183,7 +183,7 @@ public class ScheduleManager extends org
(String)row.getValue(timezoneField),
(Long)row.getValue(windowDurationField),
stringToRequestMinimumValue((String)row.getValue(requestMinimumField)));
- ((JobDescription)returnValues.get(ownerID)).addScheduleRecord(sr);
+ returnValues.get(ownerID).addScheduleRecord(sr);
i++;
}
}