You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/10 01:19:10 UTC

svn commit: r1601529 [4/6] - in /manifoldcf/trunk: ./ connectors/ connectors/nulltransformation/ framework/agents/src/main/java/org/apache/manifoldcf/agents/ framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ framework/agent...

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ForcedParamManager.java Mon Jun  9 23:19:08 2014
@@ -121,9 +121,10 @@ public class ForcedParamManager extends 
   public Map<String,Set<String>> readRows(Long id)
     throws ManifoldCFException
   {
-    ArrayList list = new ArrayList();
-    list.add(id);
-    IResultSet set = performQuery("SELECT "+paramNameField+","+paramValueField+" FROM "+getTableName()+" WHERE "+ownerIDField+"=?",list,
+    ArrayList params = new ArrayList();
+    String query = buildConjunctionClause(params,new ClauseDescription[]{
+      new UnitaryClause(ownerIDField,id)});
+    IResultSet set = performQuery("SELECT "+paramNameField+","+paramValueField+" FROM "+getTableName()+" WHERE "+query,params,
       null,null);
     Map<String,Set<String>> rval = new HashMap<String,Set<String>>();
     if (set.getRowCount() == 0)
@@ -168,6 +169,34 @@ public class ForcedParamManager extends 
     }
   }
 
+  /** Compare rows in database against what is in job description.
+  *@param ownerID is the owning identifier.
+  *@param list is the job description to write hopcount filters for.
+  */
+  public boolean compareRows(Long ownerID, IJobDescription list)
+    throws ManifoldCFException
+  {
+    Map<String,Set<String>> map = readRows(ownerID);
+    Map<String,Set<String>> otherMap = list.getForcedMetadata();
+    if (map.size() != otherMap.size())
+      return false;
+    for (String x : map.keySet())
+    {
+      Set<String> xValues = map.get(x);
+      Set<String> otherValues = otherMap.get(x);
+      if (otherValues == null)
+        return false;
+      if (xValues.size() != otherValues.size())
+        return false;
+      for (String y : xValues)
+      {
+        if (!otherValues.contains(y))
+          return false;
+      }
+    }
+    return true;
+  }
+  
   /** Write a filter list into the database.
   *@param ownerID is the owning identifier.
   *@param list is the job description to write hopcount filters for.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopFilterManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopFilterManager.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopFilterManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopFilterManager.java Mon Jun  9 23:19:08 2014
@@ -147,7 +147,7 @@ public class HopFilterManager extends or
   *@param ownerIDList is the list of owner id's.
   *@param ownerIDParams is the corresponding set of owner id parameters.
   */
-  public void getRows(Map returnValues, String ownerIDList, ArrayList ownerIDParams)
+  public void getRows(Map<Long,JobDescription> returnValues, String ownerIDList, ArrayList ownerIDParams)
     throws ManifoldCFException
   {
     IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+ownerIDField+" IN ("+ownerIDList+")",ownerIDParams,
@@ -159,11 +159,35 @@ public class HopFilterManager extends or
       Long ownerID = (Long)row.getValue(ownerIDField);
       String linkType = (String)row.getValue(linkTypeField);
       Long maxHops = (Long)row.getValue(maxHopsField);
-      ((JobDescription)returnValues.get(ownerID)).addHopCountFilter(linkType,maxHops);
+      returnValues.get(ownerID).addHopCountFilter(linkType,maxHops);
       i++;
     }
   }
 
+  /** Compare a filter list against what's in a job description.
+  *@param ownerID is the owning identifier.
+  *@param list is the job description to write hopcount filters for.
+  */
+  public boolean compareRows(Long ownerID, IJobDescription list)
+    throws ManifoldCFException
+  {
+    // Compare hopcount filter criteria.
+    Map filterRows = readRows(ownerID);
+    Map newFilterRows = list.getHopCountFilters();
+    if (filterRows.size() != newFilterRows.size())
+      return false;
+    for (String linkType : (Collection<String>)filterRows.keySet())
+    {
+      Long oldCount = (Long)filterRows.get(linkType);
+      Long newCount = (Long)newFilterRows.get(linkType);
+      if (oldCount == null || newCount == null)
+        return false;
+      if (oldCount.longValue() != newCount.longValue())
+        return false;
+    }
+    return true;
+  }
+  
   /** Write a filter list into the database.
   *@param ownerID is the owning identifier.
   *@param list is the job description to write hopcount filters for.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobDescription.java Mon Jun  9 23:19:08 2014
@@ -44,6 +44,7 @@ public class JobDescription implements I
   protected String description = null;
   protected String outputConnectionName = null;
   protected String connectionName = null;
+  protected final List<PipelineStage> pipelineStages = new ArrayList<PipelineStage>();
   protected int type = TYPE_CONTINUOUS;
   protected int startMethod = START_WINDOWBEGIN;
   protected int priority = 5;
@@ -96,7 +97,15 @@ public class JobDescription implements I
     rval.id = id;
     rval.isNew = isNew;
     rval.outputConnectionName = outputConnectionName;
+    // Direct modification of this object is possible - so it also has to know if it is read-only!!
+    rval.outputSpecification = outputSpecification.duplicate(readOnly);
     rval.connectionName = connectionName;
+    // Direct modification of this object is possible - so it also has to know if it is read-only!!
+    rval.documentSpecification = documentSpecification.duplicate(readOnly);
+    for (PipelineStage pipelineStage : pipelineStages)
+    {
+      rval.pipelineStages.add(new PipelineStage(pipelineStage.getConnectionName(),pipelineStage.getDescription(),pipelineStage.getSpecification().duplicate(readOnly)));
+    }
     rval.description = description;
     rval.type = type;
     // No direct modification of this object is possible
@@ -124,10 +133,6 @@ public class JobDescription implements I
         rval.addForcedMetadataValue(forcedParamName,value);
       }
     }
-    // Direct modification of this object is possible - so it also has to know if it is read-only!!
-    rval.outputSpecification = outputSpecification.duplicate(readOnly);
-    // Direct modification of this object is possible - so it also has to know if it is read-only!!
-    rval.documentSpecification = documentSpecification.duplicate(readOnly);
     rval.readOnly = readOnly;
     return rval;
   }
@@ -242,6 +247,91 @@ public class JobDescription implements I
     return connectionName;
   }
 
+  /** Clear pipeline connections */
+  @Override
+  public void clearPipeline()
+  {
+    if (readOnly)
+      throw new IllegalStateException("Attempt to change read-only object");
+    pipelineStages.clear();
+  }
+  
+  /** Add a pipeline connection.
+  *@param pipelineStageConnectionName is the name of the pipeline connection to add.
+  *@param pipelineStageDescription is a description of the pipeline stage being added.
+  *@return the empty output specification for this pipeline stage.
+  */
+  @Override
+  public OutputSpecification addPipelineStage(String pipelineStageConnectionName, String pipelineStageDescription)
+  {
+    if (readOnly)
+      throw new IllegalStateException("Attempt to change read-only object");
+    PipelineStage ps = new PipelineStage(pipelineStageConnectionName,pipelineStageDescription);
+    pipelineStages.add(ps);
+    return ps.getSpecification();
+  }
+  
+  /** Get a count of pipeline stages */
+  @Override
+  public int countPipelineStages()
+  {
+    return pipelineStages.size();
+  }
+  
+  /** Insert a new pipeline stage.
+  *@param index is the index to insert pipeline stage before
+  *@param pipelineStageConnectionName is the connection name.
+  *@param pipelineStageDescription is the description.
+  *@return the newly-created output specification.
+  */
+  public OutputSpecification insertPipelineStage(int index, String pipelineStageConnectionName, String pipelineStageDescription)
+  {
+    if (readOnly)
+      throw new IllegalStateException("Attempt to change read-only object");
+    PipelineStage ps = new PipelineStage(pipelineStageConnectionName,pipelineStageDescription);
+    pipelineStages.add(index,ps);
+    return ps.getSpecification();
+  }
+  
+  /** Get a specific pipeline connection name.
+  *@param index is the index of the pipeline stage whose connection name to get.
+  *@return the name of the connection.
+  */
+  @Override
+  public String getPipelineStageConnectionName(int index)
+  {
+    return pipelineStages.get(index).getConnectionName();
+  }
+  
+  /** Get a specific pipeline stage description.
+  *@param index is the index of the pipeline stage whose description to get.
+  *@return the name of the connection.
+  */
+  @Override
+  public String getPipelineStageDescription(int index)
+  {
+    return pipelineStages.get(index).getDescription();
+  }
+
+  /** Get a specific pipeline stage specification.
+  *@param index is the index of the pipeline stage whose specification is needed.
+  *@return the specification for the connection.
+  */
+  @Override
+  public OutputSpecification getPipelineStageSpecification(int index)
+  {
+    return pipelineStages.get(index).getSpecification();
+  }
+
+  /** Delete a pipeline stage.
+  *@param index is the index of the pipeline stage to delete.
+  */
+  @Override
+  public void deletePipelineStage(int index)
+  {
+    pipelineStages.remove(index);
+  }
+
   /** Set the job type.
   *@param type is the type (as an integer).
   */
@@ -544,4 +634,40 @@ public class JobDescription implements I
     rval.add(value);
   }
 
+  protected static class PipelineStage
+  {
+    protected final String connectionName;
+    protected final String description;
+    protected final OutputSpecification specification;
+    
+    public PipelineStage(String connectionName, String description)
+    {
+      this.connectionName = connectionName;
+      this.description = description;
+      this.specification = new OutputSpecification();
+    }
+
+    public PipelineStage(String connectionName, String description, OutputSpecification spec)
+    {
+      this.connectionName = connectionName;
+      this.description = description;
+      this.specification = spec;
+    }
+    
+    public OutputSpecification getSpecification()
+    {
+      return specification;
+    }
+    
+    public String getConnectionName()
+    {
+      return connectionName;
+    }
+    
+    public String getDescription()
+    {
+      return description;
+    }
+  }
+  
 }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Jun  9 23:19:08 2014
@@ -45,6 +45,12 @@ public class JobManager implements IJobM
   protected final IDBInterface database;
   protected final IOutputConnectionManager outputMgr;
   protected final IRepositoryConnectionManager connectionMgr;
+  protected final ITransformationConnectionManager transformationMgr;
+  
+  protected final IOutputConnectorManager outputConnectorMgr;
+  protected final IConnectorManager connectorMgr;
+  protected final ITransformationConnectorManager transformationConnectorMgr;
+  
   protected final IRepositoryConnectorPool repositoryConnectorPool;
   protected final ILockManager lockManager;
   protected final IThreadContext threadContext;
@@ -73,6 +79,10 @@ public class JobManager implements IJobM
     eventManager = new EventManager(database);
     outputMgr = OutputConnectionManagerFactory.make(threadContext);
     connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+    transformationMgr = TransformationConnectionManagerFactory.make(threadContext);
+    outputConnectorMgr = OutputConnectorManagerFactory.make(threadContext);
+    connectorMgr = ConnectorManagerFactory.make(threadContext);
+    transformationConnectorMgr = TransformationConnectorManagerFactory.make(threadContext);
     repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
     lockManager = LockManagerFactory.make(threadContext);
   }
@@ -83,7 +93,9 @@ public class JobManager implements IJobM
   public void install()
     throws ManifoldCFException
   {
-    jobs.install(outputMgr.getTableName(),outputMgr.getConnectionNameColumn(),connectionMgr.getTableName(),connectionMgr.getConnectionNameColumn());
+    jobs.install(transformationMgr.getTableName(),transformationMgr.getConnectionNameColumn(),
+      outputMgr.getTableName(),outputMgr.getConnectionNameColumn(),
+      connectionMgr.getTableName(),connectionMgr.getConnectionNameColumn());
     jobQueue.install(jobs.getTableName(),jobs.idField);
     hopCount.install(jobs.getTableName(),jobs.idField);
     carryDown.install(jobs.getTableName(),jobs.idField);
@@ -109,16 +121,14 @@ public class JobManager implements IJobM
     throws java.io.IOException, ManifoldCFException
   {
     // Write a version indicator
-    ManifoldCF.writeDword(os,3);
+    ManifoldCF.writeDword(os,4);
     // Get the job list
     IJobDescription[] list = getAllJobs();
     // Write the number of authorities
     ManifoldCF.writeDword(os,list.length);
     // Loop through the list and write the individual repository connection info
-    int i = 0;
-    while (i < list.length)
+    for (IJobDescription job : list)
     {
-      IJobDescription job = list[i++];
       ManifoldCF.writeString(os,job.getConnectionName());
       ManifoldCF.writeString(os,job.getOutputConnectionName());
       ManifoldCF.writeString(os,job.getDescription());
@@ -135,10 +145,9 @@ public class JobManager implements IJobM
       // Write schedule
       int recCount = job.getScheduleRecordCount();
       ManifoldCF.writeDword(os,recCount);
-      int j = 0;
-      while (j < recCount)
+      for (int j = 0; j < recCount; j++)
       {
-        ScheduleRecord sr = job.getScheduleRecord(j++);
+        ScheduleRecord sr = job.getScheduleRecord(j);
         writeEnumeratedValues(os,sr.getDayOfWeek());
         writeEnumeratedValues(os,sr.getMonthOfYear());
         writeEnumeratedValues(os,sr.getDayOfMonth());
@@ -161,6 +170,29 @@ public class JobManager implements IJobM
         ManifoldCF.writeString(os,linkType);
         ManifoldCF.writeLong(os,hopcount);
       }
+      
+      // Write forced metadata information
+      Map<String,Set<String>> forcedMetadata = job.getForcedMetadata();
+      ManifoldCF.writeDword(os,forcedMetadata.size());
+      for (String key : forcedMetadata.keySet())
+      {
+        ManifoldCF.writeString(os,key);
+        Set<String> values = forcedMetadata.get(key);
+        ManifoldCF.writeDword(os,values.size());
+        for (String value : values)
+        {
+          ManifoldCF.writeString(os,value);
+        }
+      }
+      
+      // Write pipeline information
+      ManifoldCF.writeDword(os,job.countPipelineStages());
+      for (int j = 0; j < job.countPipelineStages(); j++)
+      {
+        ManifoldCF.writeString(os,job.getPipelineStageConnectionName(j));
+        ManifoldCF.writeString(os,job.getPipelineStageDescription(j));
+        ManifoldCF.writeString(os,job.getPipelineStageSpecification(j).toXML());
+      }
     }
   }
 
@@ -187,11 +219,10 @@ public class JobManager implements IJobM
     throws java.io.IOException, ManifoldCFException
   {
     int version = ManifoldCF.readDword(is);
-    if (version != 2 && version != 3)
+    if (version != 2 && version != 3 && version != 4)
       throw new java.io.IOException("Unknown job configuration version: "+Integer.toString(version));
     int count = ManifoldCF.readDword(is);
-    int i = 0;
-    while (i < count)
+    for (int i = 0; i < count; i++)
     {
       IJobDescription job = createJob();
 
@@ -210,8 +241,7 @@ public class JobManager implements IJobM
 
       // Read schedule
       int recCount = ManifoldCF.readDword(is);
-      int j = 0;
-      while (j < recCount)
+      for (int j = 0; j < recCount; j++)
       {
         EnumeratedValues dayOfWeek = readEnumeratedValues(is);
         EnumeratedValues monthOfYear = readEnumeratedValues(is);
@@ -230,23 +260,45 @@ public class JobManager implements IJobM
         ScheduleRecord sr = new ScheduleRecord(dayOfWeek, monthOfYear, dayOfMonth, year,
           hourOfDay, minutesOfHour, timezone, duration, requestMinimum);
         job.addScheduleRecord(sr);
-        j++;
       }
 
       // Read hop count filters
       int hopFilterCount = ManifoldCF.readDword(is);
-      j = 0;
-      while (j < hopFilterCount)
+      for (int j = 0; j < hopFilterCount; j++)
       {
         String linkType = ManifoldCF.readString(is);
         Long hopcount = ManifoldCF.readLong(is);
         job.addHopCountFilter(linkType,hopcount);
-        j++;
       }
 
+      if (version >= 4)
+      {
+        // Read forced metadata information
+        int paramCount = ManifoldCF.readDword(is);
+        for (int j = 0; j < paramCount; j++)
+        {
+          String key = ManifoldCF.readString(is);
+          int valueCount = ManifoldCF.readDword(is);
+          for (int k = 0; k < valueCount; k++)
+          {
+            String value = ManifoldCF.readString(is);
+            job.addForcedMetadataValue(key,value);
+          }
+        }
+        
+        // Read pipeline information
+        int pipelineCount = ManifoldCF.readDword(is);
+        for (int j = 0; j < pipelineCount; j++)
+        {
+          String connectionName = ManifoldCF.readString(is);
+          String description = ManifoldCF.readString(is);
+          String specification = ManifoldCF.readString(is);
+          job.addPipelineStage(connectionName,description).fromXML(specification);
+        }
+      }
+      
       // Attempt to save this job
       save(job);
-      i++;
     }
   }
 
@@ -275,7 +327,7 @@ public class JobManager implements IJobM
     throws ManifoldCFException
   {
     // For each connection, find the corresponding list of jobs.  From these jobs, we want the job id and the status.
-    ArrayList list = new ArrayList();
+    List<String> list = new ArrayList<String>();
     int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
     int currentCount = 0;
     int i = 0;
@@ -297,7 +349,7 @@ public class JobManager implements IJobM
 
   /** Note deregistration for a batch of connection names.
   */
-  protected void noteConnectionDeregistration(ArrayList list)
+  protected void noteConnectionDeregistration(List<String> list)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -327,7 +379,7 @@ public class JobManager implements IJobM
     throws ManifoldCFException
   {
     // For each connection, find the corresponding list of jobs.  From these jobs, we want the job id and the status.
-    ArrayList list = new ArrayList();
+    List<String> list = new ArrayList<String>();
     int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
     int currentCount = 0;
     int i = 0;
@@ -349,7 +401,7 @@ public class JobManager implements IJobM
 
   /** Note registration for a batch of connection names.
   */
-  protected void noteConnectionRegistration(ArrayList list)
+  protected void noteConnectionRegistration(List<String> list)
     throws ManifoldCFException
   {
     // Query for the matching jobs, and then for each job potentially adjust the state
@@ -369,17 +421,6 @@ public class JobManager implements IJobM
     }
   }
 
-  /** Note a change in connection configuration.
-  * This method will be called whenever a connection's configuration is modified, or when an external repository change
-  * is signalled.
-  */
-  @Override
-  public void noteConnectionChange(String connectionName)
-    throws ManifoldCFException
-  {
-    jobs.noteConnectionChange(connectionName);
-  }
-
   /**  Note the deregistration of an output connector used by the specified connections.
   * This method will be called when the connector is deregistered.  Jobs that use these connections
   *  must therefore enter appropriate states.
@@ -390,7 +431,7 @@ public class JobManager implements IJobM
     throws ManifoldCFException
   {
     // For each connection, find the corresponding list of jobs.  From these jobs, we want the job id and the status.
-    ArrayList list = new ArrayList();
+    List<String> list = new ArrayList<String>();
     int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
     int currentCount = 0;
     int i = 0;
@@ -412,7 +453,7 @@ public class JobManager implements IJobM
 
   /** Note deregistration for a batch of output connection names.
   */
-  protected void noteOutputConnectionDeregistration(ArrayList list)
+  protected void noteOutputConnectionDeregistration(List<String> list)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -442,7 +483,7 @@ public class JobManager implements IJobM
     throws ManifoldCFException
   {
     // For each connection, find the corresponding list of jobs.  From these jobs, we want the job id and the status.
-    ArrayList list = new ArrayList();
+    List<String> list = new ArrayList<String>();
     int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
     int currentCount = 0;
     int i = 0;
@@ -464,7 +505,7 @@ public class JobManager implements IJobM
 
   /** Note registration for a batch of output connection names.
   */
-  protected void noteOutputConnectionRegistration(ArrayList list)
+  protected void noteOutputConnectionRegistration(List<String> list)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -484,6 +525,129 @@ public class JobManager implements IJobM
     }
   }
 
+  /**  Note the deregistration of a transformation connector used by the specified connections.
+  * This method will be called when the connector is deregistered.  Jobs that use these connections
+  *  must therefore enter appropriate states.
+  *@param connectionNames is the set of connection names.
+  */
+  @Override
+  public void noteTransformationConnectorDeregistration(String[] connectionNames)
+    throws ManifoldCFException
+  {
+    // For each connection, find the corresponding list of jobs.  From these jobs, we want the job id and the status.
+    List<String> list = new ArrayList<String>();
+    int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
+    int currentCount = 0;
+    int i = 0;
+    while (i < connectionNames.length)
+    {
+      if (currentCount == maxCount)
+      {
+        noteConnectionDeregistration(list);
+        list.clear();
+        currentCount = 0;
+      }
+
+      list.add(connectionNames[i++]);
+      currentCount++;
+    }
+    if (currentCount > 0)
+      noteTransformationConnectionDeregistration(list);
+  }
+
+  /** Note deregistration for a batch of transformation connection names.
+  */
+  protected void noteTransformationConnectionDeregistration(List<String> list)
+    throws ManifoldCFException
+  {
+    // Query for the matching jobs, and then for each job potentially adjust the state
+    Long[] jobIDs = jobs.findJobsMatchingTransformations(list);
+    StringBuilder query = new StringBuilder();
+    ArrayList newList = new ArrayList();
+    
+    query.append("SELECT ").append(jobs.idField).append(",").append(jobs.statusField)
+      .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+      .append(database.buildConjunctionClause(newList,new ClauseDescription[]{
+        new MultiClause(jobs.idField,jobIDs)}))
+      .append(" FOR UPDATE");
+    IResultSet set = database.performQuery(query.toString(),newList,null,null);
+    int i = 0;
+    while (i < set.getRowCount())
+    {
+      IResultRow row = set.getRow(i++);
+      Long jobID = (Long)row.getValue(jobs.idField);
+      int statusValue = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+      jobs.noteTransformationConnectorDeregistration(jobID,statusValue);
+    }
+  }
+
+  /** Note the registration of a transformation connector used by the specified connections.
+  * This method will be called when a connector is registered, on which the specified
+  * connections depend.
+  *@param connectionNames is the set of connection names.
+  */
+  @Override
+  public void noteTransformationConnectorRegistration(String[] connectionNames)
+    throws ManifoldCFException
+  {
+    // For each connection, find the corresponding list of jobs.  From these jobs, we want the job id and the status.
+    List<String> list = new ArrayList<String>();
+    int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
+    int currentCount = 0;
+    int i = 0;
+    while (i < connectionNames.length)
+    {
+      if (currentCount == maxCount)
+      {
+        noteConnectionDeregistration(list);
+        list.clear();
+        currentCount = 0;
+      }
+
+      list.add(connectionNames[i++]);
+      currentCount++;
+    }
+    if (currentCount > 0)
+      noteTransformationConnectionRegistration(list);
+  }
+
+  /** Note registration for a batch of transformation connection names.
+  */
+  protected void noteTransformationConnectionRegistration(List<String> list)
+    throws ManifoldCFException
+  {
+    // Query for the matching jobs, and then for each job potentially adjust the state
+    Long[] jobIDs = jobs.findJobsMatchingTransformations(list);
+    StringBuilder query = new StringBuilder();
+    ArrayList newList = new ArrayList();
+    
+    query.append("SELECT ").append(jobs.idField).append(",").append(jobs.statusField)
+      .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+      .append(database.buildConjunctionClause(newList,new ClauseDescription[]{
+        new MultiClause(jobs.idField,jobIDs)}))
+      .append(" FOR UPDATE");
+    IResultSet set = database.performQuery(query.toString(),newList,null,null);
+    int i = 0;
+    while (i < set.getRowCount())
+    {
+      IResultRow row = set.getRow(i++);
+      Long jobID = (Long)row.getValue(jobs.idField);
+      int statusValue = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+      jobs.noteTransformationConnectorRegistration(jobID,statusValue);
+    }
+  }
+
+  /** Note a change in connection configuration.
+  * This method will be called whenever a connection's configuration is modified, or when an external repository change
+  * is signalled.
+  */
+  @Override
+  public void noteConnectionChange(String connectionName)
+    throws ManifoldCFException
+  {
+    jobs.noteConnectionChange(connectionName);
+  }
+
   /** Note a change in output connection configuration.
   * This method will be called whenever a connection's configuration is modified, or when an external target config change
   * is signalled.
@@ -495,6 +659,48 @@ public class JobManager implements IJobM
     jobs.noteOutputConnectionChange(connectionName);
   }
 
+  /** Note a change in transformation connection configuration.
+  * This method will be called whenever a connection's configuration is modified.
+  */
+  @Override
+  public void noteTransformationConnectionChange(String connectionName)
+    throws ManifoldCFException
+  {
+    jobs.noteTransformationConnectionChange(connectionName);
+  }
+
+  /** Assess jobs marked to be in need of assessment for connector status changes.
+  */
+  public void assessMarkedJobs()
+    throws ManifoldCFException
+  {
+    database.beginTransaction();
+    try
+    {
+      // Query for all jobs marked "ASSESSMENT_UNKNOWN".
+      jobs.assessMarkedJobs();
+    }
+    catch (ManifoldCFException e)
+    {
+      database.signalRollback();
+      throw e;
+    }
+    catch (RuntimeException e)
+    {
+      database.signalRollback();
+      throw e;
+    }
+    catch (Error e)
+    {
+      database.signalRollback();
+      throw e;
+    }
+    finally
+    {
+      database.endTransaction();
+    }
+  }
+
   /** Load a sorted list of job descriptions.
   *@return the list, sorted by description.
   */
@@ -545,9 +751,7 @@ public class JobManager implements IJobM
       IResultRow row = set.getRow(0);
       int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString());
       if (status == jobs.STATUS_ACTIVE || status == jobs.STATUS_ACTIVESEEDING ||
-        status == jobs.STATUS_ACTIVE_UNINSTALLED || status == jobs.STATUS_ACTIVESEEDING_UNINSTALLED ||
-        status == jobs.STATUS_ACTIVE_NOOUTPUT || status == jobs.STATUS_ACTIVESEEDING_NOOUTPUT ||
-        status == jobs.STATUS_ACTIVE_NEITHER || status == jobs.STATUS_ACTIVESEEDING_NEITHER)
+        status == jobs.STATUS_ACTIVE_UNINSTALLED || status == jobs.STATUS_ACTIVESEEDING_UNINSTALLED)
       throw new ManifoldCFException("Job "+id+" is active; you must shut it down before deleting it");
       if (status != jobs.STATUS_INACTIVE)
         throw new ManifoldCFException("Job "+id+" is busy; you must wait and/or shut it down before deleting it");
@@ -560,6 +764,11 @@ public class JobManager implements IJobM
       database.signalRollback();
       throw e;
     }
+    catch (RuntimeException e)
+    {
+      database.signalRollback();
+      throw e;
+    }
     catch (Error e)
     {
       database.signalRollback();
@@ -628,6 +837,17 @@ public class JobManager implements IJobM
     return jobs.checkIfOutputReference(connectionName);
   }
 
+  /** See if there's a reference to a transformation connection name.
+  *@param connectionName is the name of the connection.
+  *@return true if there is a reference, false otherwise.
+  */
+  @Override
+  public boolean checkIfTransformationReference(String connectionName)
+    throws ManifoldCFException
+  {
+    return jobs.checkIfTransformationReference(connectionName);
+  }
+
   /** Get the job IDs associated with a given connection name.
   *@param connectionName is the name of the connection.
   *@return the set of job id's associated with that connection.
@@ -1932,11 +2152,7 @@ public class JobManager implements IJobM
           Jobs.statusToString(Jobs.STATUS_ACTIVE),
           Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING),
           Jobs.statusToString(Jobs.STATUS_ACTIVE_UNINSTALLED),
-          Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED),
-          Jobs.statusToString(Jobs.STATUS_ACTIVE_NOOUTPUT),
-          Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NOOUTPUT),
-          Jobs.statusToString(Jobs.STATUS_ACTIVE_NEITHER),
-          Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NEITHER)
+          Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED)
           }),
         new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)}))
       .append(") ");
@@ -2582,7 +2798,39 @@ public class JobManager implements IJobM
       true)).append(" ")
       .append(database.constructOffsetLimitClause(0,1,true));
 
-    IResultSet set = database.performQuery(sb.toString(),list,null,null,1,null);
+    IResultSet set;
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        set = database.performQuery(sb.toString(),list,null,null,1,null);
+        break;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction adding document bins: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+      }
+    }
+
     if (set.getRowCount() > 0)
     {
       IResultRow row = set.getRow(0);
@@ -2659,6 +2907,8 @@ public class JobManager implements IJobM
       "t0."+jobQueue.docPriorityField, "t0."+jobQueue.statusField, "t0."+jobQueue.checkActionField, "t0."+jobQueue.checkTimeField},
       true)).append(" ");
 
+    String query = sb.toString();
+
     // Before entering the transaction, we must provide the throttlelimit object with all the connector
     // instances it could possibly need.  The purpose for doing this is to prevent a deadlock where
     // connector starvation causes database lockup.
@@ -2700,7 +2950,7 @@ public class JobManager implements IJobM
 
           // Now we can tack the limit onto the query.  Before this point, remainingDocuments would be crap
           int limitValue = vList.getRemainingDocuments();
-          sb.append(database.constructOffsetLimitClause(0,limitValue,true));
+          String finalQuery = query + database.constructOffsetLimitClause(0,limitValue,true);
 
           if (Logging.perf.isDebugEnabled())
           {
@@ -2711,7 +2961,7 @@ public class JobManager implements IJobM
           database.beginTransaction();
           try
           {
-            IResultSet set = database.performQuery(sb.toString(),list,null,null,-1,vList);
+            IResultSet set = database.performQuery(finalQuery,list,null,null,-1,vList);
 
             if (Logging.perf.isDebugEnabled())
               Logging.perf.debug(" Queuing "+Integer.toString(set.getRowCount())+" documents");
@@ -5634,10 +5884,6 @@ public class JobManager implements IJobM
             jobs.statusToString(jobs.STATUS_ACTIVESEEDING),
             jobs.statusToString(jobs.STATUS_ACTIVE_UNINSTALLED),
             jobs.statusToString(jobs.STATUS_ACTIVESEEDING_UNINSTALLED),
-            jobs.statusToString(jobs.STATUS_ACTIVE_NOOUTPUT),
-            jobs.statusToString(jobs.STATUS_ACTIVESEEDING_NOOUTPUT),
-            jobs.statusToString(jobs.STATUS_ACTIVE_NEITHER),
-            jobs.statusToString(jobs.STATUS_ACTIVESEEDING_NEITHER),
             jobs.statusToString(jobs.STATUS_PAUSED),
             jobs.statusToString(jobs.STATUS_PAUSEDSEEDING)})})).append(" AND ")
         .append(jobs.windowEndField).append("<? FOR UPDATE");
@@ -5661,8 +5907,6 @@ public class JobManager implements IJobM
         {
         case Jobs.STATUS_ACTIVE:
         case Jobs.STATUS_ACTIVE_UNINSTALLED:
-        case Jobs.STATUS_ACTIVE_NOOUTPUT:
-        case Jobs.STATUS_ACTIVE_NEITHER:
           jobs.waitJob(jobID,Jobs.STATUS_ACTIVEWAITING);
           if (Logging.jobs.isDebugEnabled())
           {
@@ -5671,8 +5915,6 @@ public class JobManager implements IJobM
           break;
         case Jobs.STATUS_ACTIVESEEDING:
         case Jobs.STATUS_ACTIVESEEDING_UNINSTALLED:
-        case Jobs.STATUS_ACTIVESEEDING_NOOUTPUT:
-        case Jobs.STATUS_ACTIVESEEDING_NEITHER:
           jobs.waitJob(jobID,Jobs.STATUS_ACTIVEWAITINGSEEDING);
           if (Logging.jobs.isDebugEnabled())
           {
@@ -7157,20 +7399,6 @@ public class JobManager implements IJobM
           // Set the state of the job back to "Active"
           jobs.writePermanentStatus(jobID,jobs.STATUS_ACTIVE_UNINSTALLED);
           break;
-        case Jobs.STATUS_ACTIVESEEDING_NOOUTPUT:
-          if (Logging.jobs.isDebugEnabled())
-            Logging.jobs.debug("Setting job "+jobID+" back to 'Active_NoOutput' state");
-
-          // Set the state of the job back to "Active"
-          jobs.writePermanentStatus(jobID,jobs.STATUS_ACTIVE_NOOUTPUT);
-          break;
-        case Jobs.STATUS_ACTIVESEEDING_NEITHER:
-          if (Logging.jobs.isDebugEnabled())
-            Logging.jobs.debug("Setting job "+jobID+" back to 'Active_Neither' state");
-
-          // Set the state of the job back to "Active"
-          jobs.writePermanentStatus(jobID,jobs.STATUS_ACTIVE_NEITHER);
-          break;
         case Jobs.STATUS_ACTIVESEEDING:
           if (Logging.jobs.isDebugEnabled())
             Logging.jobs.debug("Setting job "+jobID+" back to 'Active' state");
@@ -7221,8 +7449,6 @@ public class JobManager implements IJobM
         case Jobs.STATUS_ABORTINGFORRESTARTMINIMAL:
         case Jobs.STATUS_ACTIVE:
         case Jobs.STATUS_ACTIVE_UNINSTALLED:
-        case Jobs.STATUS_ACTIVE_NOOUTPUT:
-        case Jobs.STATUS_ACTIVE_NEITHER:
         case Jobs.STATUS_PAUSED:
         case Jobs.STATUS_ACTIVEWAIT:
         case Jobs.STATUS_PAUSEDWAIT:
@@ -7399,9 +7625,7 @@ public class JobManager implements IJobM
             new MultiClause(jobs.statusField,new Object[]{
               jobs.statusToString(jobs.STATUS_ACTIVE),
               jobs.statusToString(jobs.STATUS_ACTIVEWAIT),
-              jobs.statusToString(jobs.STATUS_ACTIVE_UNINSTALLED),
-              jobs.statusToString(jobs.STATUS_ACTIVE_NOOUTPUT),
-              jobs.statusToString(jobs.STATUS_ACTIVE_NEITHER)})}))
+              jobs.statusToString(jobs.STATUS_ACTIVE_UNINSTALLED)})}))
           .append(" FOR UPDATE");
         
         IResultSet set = database.performQuery(sb.toString(),list,null,null);
@@ -7930,10 +8154,6 @@ public class JobManager implements IJobM
         Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING),
         Jobs.statusToString(Jobs.STATUS_ACTIVE_UNINSTALLED),
         Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED),
-        Jobs.statusToString(Jobs.STATUS_ACTIVE_NOOUTPUT),
-        Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NOOUTPUT),
-        Jobs.statusToString(Jobs.STATUS_ACTIVE_NEITHER),
-        Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_NEITHER),
         Jobs.statusToString(Jobs.STATUS_PAUSED),
         Jobs.statusToString(Jobs.STATUS_PAUSEDSEEDING),
         Jobs.statusToString(Jobs.STATUS_ACTIVEWAIT),
@@ -8066,10 +8286,6 @@ public class JobManager implements IJobM
         break;
       case Jobs.STATUS_ACTIVE_UNINSTALLED:
       case Jobs.STATUS_ACTIVESEEDING_UNINSTALLED:
-      case Jobs.STATUS_ACTIVE_NOOUTPUT:
-      case Jobs.STATUS_ACTIVESEEDING_NOOUTPUT:
-      case Jobs.STATUS_ACTIVE_NEITHER:
-      case Jobs.STATUS_ACTIVESEEDING_NEITHER:
         rstatus = JobStatus.JOBSTATUS_RUNNING_UNINSTALLED;
         break;
       case Jobs.STATUS_ACTIVE:

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Mon Jun  9 23:19:08 2014
@@ -57,6 +57,7 @@ import java.util.*;
  * <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
  * <tr><td>failtime</td><td>BIGINT</td><td></td></tr>
  * <tr><td>failcount</td><td>BIGINT</td><td></td></tr>
+ * <tr><td>assessmentstate</td><td>CHAR(1)</td><td></td></tr>
  * </table>
  * <br><br>
  * 
@@ -124,12 +125,18 @@ public class Jobs extends org.apache.man
   // connector exists before deciding what state to put the job into.
   public static final int STATUS_ACTIVE_UNINSTALLED = 38;               // Active, but repository connector not installed
   public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 39;   // Active and seeding, but repository connector not installed
-  public static final int STATUS_ACTIVE_NOOUTPUT = 40;                  // Active, but output connector not installed
-  public static final int STATUS_ACTIVESEEDING_NOOUTPUT = 41;       // Active and seeding, but output connector not installed
-  public static final int STATUS_ACTIVE_NEITHER = 42;                     // Active, but neither repository connector nor output connector installed
-  public static final int STATUS_ACTIVESEEDING_NEITHER = 43;          // Active and seeding, but neither repository connector nor output connector installed
-  public static final int STATUS_DELETING_NOOUTPUT = 44;                // Job is being deleted but there's no output connector installed
+  public static final int STATUS_DELETING_NOOUTPUT = 40;                // Job is being deleted but there's no output connector installed
 
+  // Deprecated states.  These states should never be used; they're defined only for upgrade purposes
+  public static final int STATUS_ACTIVE_NOOUTPUT = 100;                  // Active, but output connector not installed
+  public static final int STATUS_ACTIVESEEDING_NOOUTPUT = 101;       // Active and seeding, but output connector not installed
+  public static final int STATUS_ACTIVE_NEITHER = 102;                     // Active, but neither repository connector nor output connector installed
+  public static final int STATUS_ACTIVESEEDING_NEITHER = 103;          // Active and seeding, but neither repository connector nor output connector installed
+
+  // Need Connector Assessment states
+  public static final int ASSESSMENT_KNOWN = 0;                         // State is known.
+  public static final int ASSESSMENT_UNKNOWN = 1;                       // State is unknown, and job needs assessment
+  
   // Type field values
   public static final int TYPE_CONTINUOUS = IJobDescription.TYPE_CONTINUOUS;
   public static final int TYPE_SPECIFIED = IJobDescription.TYPE_SPECIFIED;
@@ -192,14 +199,18 @@ public class Jobs extends org.apache.man
   public static final String failTimeField = "failtime";
   /** When non-null, indicates the number of retries remaining, after which the attempt will be considered to have actually failed */
   public static final String failCountField = "failcount";
-
-  protected static Map statusMap;
-  protected static Map typeMap;
-  protected static Map startMap;
-  protected static Map hopmodeMap;
+  /** Set to N when the job needs connector-installed assessment */
+  public static final String assessmentStateField = "assessmentstate";
+  
+  protected static Map<String,Integer> statusMap;
+  protected static Map<String,Integer> typeMap;
+  protected static Map<String,Integer> startMap;
+  protected static Map<String,Integer> hopmodeMap;
+  protected static Map<String,Integer> assessmentMap;
+  
   static
   {
-    statusMap = new HashMap();
+    statusMap = new HashMap<String,Integer>();
     statusMap.put("N",new Integer(STATUS_INACTIVE));
     statusMap.put("A",new Integer(STATUS_ACTIVE));
     statusMap.put("P",new Integer(STATUS_PAUSED));
@@ -247,19 +258,23 @@ public class Jobs extends org.apache.man
     statusMap.put("u",new Integer(STATUS_ACTIVESEEDING_NEITHER));
     statusMap.put("D",new Integer(STATUS_DELETING_NOOUTPUT));
     
-    typeMap = new HashMap();
+    typeMap = new HashMap<String,Integer>();
     typeMap.put("C",new Integer(TYPE_CONTINUOUS));
     typeMap.put("S",new Integer(TYPE_SPECIFIED));
 
-    startMap = new HashMap();
+    startMap = new HashMap<String,Integer>();
     startMap.put("B",new Integer(START_WINDOWBEGIN));
     startMap.put("I",new Integer(START_WINDOWINSIDE));
     startMap.put("D",new Integer(START_DISABLE));
 
-    hopmodeMap = new HashMap();
+    hopmodeMap = new HashMap<String,Integer>();
     hopmodeMap.put("A",new Integer(HOPCOUNT_ACCURATE));
     hopmodeMap.put("N",new Integer(HOPCOUNT_NODELETE));
     hopmodeMap.put("V",new Integer(HOPCOUNT_NEVERDELETE));
+    
+    assessmentMap = new HashMap<String,Integer>();
+    assessmentMap.put("Y",new Integer(ASSESSMENT_KNOWN));
+    assessmentMap.put("N",new Integer(ASSESSMENT_UNKNOWN));
   }
 
   /* Transient vs. non-transient states
@@ -321,15 +336,17 @@ public class Jobs extends org.apache.man
   */
   
   // Local variables
-  protected ICacheManager cacheManager;
-  protected ScheduleManager scheduleManager;
-  protected HopFilterManager hopFilterManager;
-  protected ForcedParamManager forcedParamManager;
-
-  protected IOutputConnectionManager outputMgr;
-  protected IRepositoryConnectionManager connectionMgr;
-
-  protected IThreadContext threadContext;
+  protected final ICacheManager cacheManager;
+  protected final ScheduleManager scheduleManager;
+  protected final HopFilterManager hopFilterManager;
+  protected final ForcedParamManager forcedParamManager;
+  protected final PipelineManager pipelineManager;
+  
+  protected final IOutputConnectionManager outputMgr;
+  protected final IRepositoryConnectionManager connectionMgr;
+  protected final ITransformationConnectionManager transMgr;
+  
+  protected final IThreadContext threadContext;
   
   /** Constructor.
   *@param database is the database handle.
@@ -342,16 +359,20 @@ public class Jobs extends org.apache.man
     scheduleManager = new ScheduleManager(threadContext,database);
     hopFilterManager = new HopFilterManager(threadContext,database);
     forcedParamManager = new ForcedParamManager(threadContext,database);
+    pipelineManager = new PipelineManager(threadContext,database);
     
     cacheManager = CacheManagerFactory.make(threadContext);
 
     outputMgr = OutputConnectionManagerFactory.make(threadContext);
     connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+    transMgr = TransformationConnectionManagerFactory.make(threadContext);
   }
 
   /** Install or upgrade this table.
   */
-  public void install(String outputTableName, String outputNameField, String connectionTableName, String connectionNameField)
+  public void install(String transTableName, String transNameField,
+    String outputTableName, String outputNameField,
+    String connectionTableName, String connectionNameField)
     throws ManifoldCFException
   {
     // Standard practice: Have a loop around everything, in case upgrade needs it.
@@ -386,6 +407,7 @@ public class Jobs extends org.apache.man
         map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
         map.put(failTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
         map.put(failCountField,new ColumnDescription("BIGINT",false,true,null,null,false));
+        map.put(assessmentStateField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
         performCreate(map,null);
       }
       else
@@ -415,9 +437,33 @@ public class Jobs extends org.apache.man
           insertMap.put(failCountField,new ColumnDescription("BIGINT",false,true,null,null,false));
           performAlter(insertMap,null,null,null);
         }
+        if (existing.get(assessmentStateField) == null)
+        {
+          Map insertMap = new HashMap();
+          insertMap.put(assessmentStateField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
+          performAlter(insertMap,null,null,null);
+          String query;
+          ArrayList list = new ArrayList();
+          HashMap map = new HashMap();
+          query = buildConjunctionClause(list,new ClauseDescription[]{
+            new MultiClause(statusField,new Object[]{
+              statusToString(STATUS_ACTIVE_NOOUTPUT),
+              statusToString(STATUS_ACTIVE_NEITHER)})});
+          map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
+          performUpdate(map,"WHERE "+query,list,null);
+          list.clear();
+          map.clear();
+          query = buildConjunctionClause(list,new ClauseDescription[]{
+            new MultiClause(statusField,new Object[]{
+              statusToString(STATUS_ACTIVESEEDING_NOOUTPUT),
+              statusToString(STATUS_ACTIVESEEDING_NEITHER)})});
+          map.put(statusField,statusToString(STATUS_ACTIVESEEDING_UNINSTALLED));
+          performUpdate(map,"WHERE "+query,list,null);
+        }
       }
 
       // Handle related tables
+      pipelineManager.install(getTableName(),idField,transTableName,transNameField);
       scheduleManager.install(getTableName(),idField);
       hopFilterManager.install(getTableName(),idField);
       forcedParamManager.install(getTableName(),idField);
@@ -480,6 +526,7 @@ public class Jobs extends org.apache.man
       forcedParamManager.deinstall();
       hopFilterManager.deinstall();
       scheduleManager.deinstall();
+      pipelineManager.deinstall();
       performDrop(null);
     }
     catch (ManifoldCFException e)
@@ -516,6 +563,27 @@ public class Jobs extends org.apache.man
     analyzeTable();
   }
 
+  /** Find a list of jobs matching specified transformation names.
+  */
+  public Long[] findJobsMatchingTransformations(List<String> transformationConnectionNames)
+    throws ManifoldCFException
+  {
+    StringBuilder query = new StringBuilder();
+    ArrayList params = new ArrayList();
+    query.append("SELECT ").append(idField)
+      .append(" FROM ").append(getTableName()).append(" t1 WHERE EXISTS(");
+    pipelineManager.buildQueryClause(query,params,"t1."+idField,transformationConnectionNames);
+    query.append(")");
+    IResultSet set = performQuery(query.toString(),params,null,null);
+    Long[] rval = new Long[set.getRowCount()];
+    for (int i = 0; i < rval.length; i++)
+    {
+      IResultRow row = set.getRow(i);
+      rval[i] = (Long)row.getValue(idField);
+    }
+    return rval;
+  }
+  
   /** Read schedule records for a specified set of jobs.  Cannot use caching!
   */
   public ScheduleRecord[][] readScheduleRecords(Long[] jobIDs)
@@ -837,12 +905,19 @@ public class Jobs extends org.apache.man
 
               IResultRow row = set.getRow(0);
 
-              boolean isSame = true;
-
               // Determine whether we need to reset the scan time for documents.
               // Basically, any change to job parameters that could affect ingestion should clear isSame so that we
               // relook at all the documents, not just the recent ones.
 
+              boolean isSame = pipelineManager.compareRows(id,jobDescription);
+              if (!isSame)
+              {
+                int currentStatus = stringToStatus((String)row.getValue(statusField));
+                if (currentStatus == STATUS_ACTIVE || currentStatus == STATUS_ACTIVESEEDING ||
+                  currentStatus == STATUS_ACTIVE_UNINSTALLED || currentStatus == STATUS_ACTIVESEEDING_UNINSTALLED)
+                  values.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
+              }
+
               if (isSame)
               {
                 String oldOutputSpecXML = (String)row.getValue(outputSpecField);
@@ -858,32 +933,11 @@ public class Jobs extends org.apache.man
               }
 
               if (isSame)
-              {
-                // Compare hopcount filter criteria.
-                Map filterRows = hopFilterManager.readRows(id);
-                Map newFilterRows = jobDescription.getHopCountFilters();
-                if (filterRows.size() != newFilterRows.size())
-                  isSame = false;
-                else
-                {
-                  for (String linkType : (Collection<String>)filterRows.keySet())
-                  {
-                    Long oldCount = (Long)filterRows.get(linkType);
-                    Long newCount = (Long)newFilterRows.get(linkType);
-                    if (oldCount == null || newCount == null)
-                    {
-                      isSame = false;
-                      break;
-                    }
-                    else if (oldCount.longValue() != newCount.longValue())
-                    {
-                      isSame = false;
-                      break;
-                    }
-                  }
-                }
-              }
-              
+                isSame = hopFilterManager.compareRows(id,jobDescription);
+
+              if (isSame)
+                isSame = forcedParamManager.compareRows(id,jobDescription);
+
               if (!isSame)
                 values.put(lastCheckTimeField,null);
 
@@ -891,6 +945,7 @@ public class Jobs extends org.apache.man
               query = buildConjunctionClause(params,new ClauseDescription[]{
                 new UnitaryClause(idField,id)});
               performUpdate(values," WHERE "+query,params,null);
+              pipelineManager.deleteRows(id);
               scheduleManager.deleteRows(id);
               hopFilterManager.deleteRows(id);
               forcedParamManager.deleteRows(id);
@@ -907,6 +962,8 @@ public class Jobs extends org.apache.man
               performInsert(values,null);
             }
 
+            // Write pipeline rows
+            pipelineManager.writeRows(id,jobDescription);
             // Write schedule records
             scheduleManager.writeRows(id,jobDescription);
             // Write hop filter rows
@@ -1112,19 +1169,6 @@ public class Jobs extends org.apache.man
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
     list.clear();
-    query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT)),
-      new UnitaryClause(processIDField,processID)});
-    map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
-    map.put(processIDField,null);
-    performUpdate(map,"WHERE "+query,list,invKey);
-    list.clear();
-    query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER)),
-      new UnitaryClause(processIDField,processID)});
-    map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
-    map.put(processIDField,null);
-    performUpdate(map,"WHERE "+query,list,invKey);
 
   }
 
@@ -1256,18 +1300,6 @@ public class Jobs extends org.apache.man
     map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
-    list.clear();
-    query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT))});
-    map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
-    map.put(processIDField,null);
-    performUpdate(map,"WHERE "+query,list,invKey);
-    list.clear();
-    query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER))});
-    map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
-    map.put(processIDField,null);
-    performUpdate(map,"WHERE "+query,list,invKey);
 
   }
 
@@ -1282,11 +1314,29 @@ public class Jobs extends org.apache.man
     performUpdate(map,"WHERE "+query,list,null);
   }
 
-  /** Signal to a job that its underlying output connector has gone away.
+  /** Invalidate current state with respect to installed connectors, as a result of registration.
+  */
+  public void invalidateCurrentUnregisteredState(Long jobID, int oldStatusValue)
+    throws ManifoldCFException
+  {
+    // If we are in a state that cares about the connector state, then we have to signal we need an assessment.
+    if (oldStatusValue == STATUS_ACTIVE_UNINSTALLED || oldStatusValue == STATUS_ACTIVESEEDING_UNINSTALLED || oldStatusValue == STATUS_DELETING_NOOUTPUT)
+    {
+      // Assessment state is not cached, so no cache invalidation needed
+      ArrayList list = new ArrayList();
+      String query = buildConjunctionClause(list,new ClauseDescription[]{
+        new UnitaryClause(idField,jobID)});
+      HashMap map = new HashMap();
+      map.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
+      performUpdate(map,"WHERE "+query,list,null);
+    }
+  }
+  
+  /** Signal to a job that an underlying transformation connector has gone away.
   *@param jobID is the identifier of the job.
   *@param oldStatusValue is the current status value for the job.
   */
-  public void noteOutputConnectorDeregistration(Long jobID, int oldStatusValue)
+  public void noteTransformationConnectorDeregistration(Long jobID, int oldStatusValue)
     throws ManifoldCFException
   {
     int newStatusValue;
@@ -1299,19 +1349,10 @@ public class Jobs extends org.apache.man
     switch (oldStatusValue)
     {
     case STATUS_ACTIVE:
-      newStatusValue = STATUS_ACTIVE_NOOUTPUT;
+      newStatusValue = STATUS_ACTIVE_UNINSTALLED;
       break;
     case STATUS_ACTIVESEEDING:
-      newStatusValue = STATUS_ACTIVESEEDING_NOOUTPUT;
-      break;
-    case STATUS_ACTIVE_UNINSTALLED:
-      newStatusValue = STATUS_ACTIVE_NEITHER;
-      break;
-    case STATUS_ACTIVESEEDING_UNINSTALLED:
-      newStatusValue = STATUS_ACTIVESEEDING_NEITHER;
-      break;
-    case STATUS_DELETING:
-      newStatusValue = STATUS_DELETING_NOOUTPUT;
+      newStatusValue = STATUS_ACTIVESEEDING_UNINSTALLED;
       break;
     default:
       newStatusValue = oldStatusValue;
@@ -1324,38 +1365,47 @@ public class Jobs extends org.apache.man
 
     HashMap newValues = new HashMap();
     newValues.put(statusField,statusToString(newStatusValue));
+    newValues.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(idField,jobID)});
     performUpdate(newValues,"WHERE "+query,list,invKey);
   }
 
-  /** Signal to a job that its underlying output connector has returned.
+  /** Signal to a job that an underlying transformation connector has been registered.
   *@param jobID is the identifier of the job.
   *@param oldStatusValue is the current status value for the job.
   */
-  public void noteOutputConnectorRegistration(Long jobID, int oldStatusValue)
+  public void noteTransformationConnectorRegistration(Long jobID, int oldStatusValue)
+    throws ManifoldCFException
+  {
+    invalidateCurrentUnregisteredState(jobID,oldStatusValue);
+  }
+
+  /** Signal to a job that its underlying output connector has gone away.
+  *@param jobID is the identifier of the job.
+  *@param oldStatusValue is the current status value for the job.
+  */
+  public void noteOutputConnectorDeregistration(Long jobID, int oldStatusValue)
     throws ManifoldCFException
   {
     int newStatusValue;
-    // The following states are special, in that when the underlying connector returns, the jobs
-    // in such states are switched back to their connector-installed value.
+    // The following states are special, in that when the underlying connector goes away, the jobs
+    // in such states are switched away to something else.  There are TWO reasons that a state may be in
+    // this category: EITHER we don't want the job in this state to be treated in the same way if its
+    // connector is uninstalled, OR we need feedback for the user interface.  If it's the latter situation,
+    // then all usages of the corresponding states will be identical - and that's in fact precisely where we
+    // start with in all the code.
     switch (oldStatusValue)
     {
-    case STATUS_ACTIVE_NOOUTPUT:
-      newStatusValue = STATUS_ACTIVE;
-      break;
-    case STATUS_ACTIVESEEDING_NOOUTPUT:
-      newStatusValue = STATUS_ACTIVESEEDING;
-      break;
-    case STATUS_ACTIVE_NEITHER:
+    case STATUS_ACTIVE:
       newStatusValue = STATUS_ACTIVE_UNINSTALLED;
       break;
-    case STATUS_ACTIVESEEDING_NEITHER:
+    case STATUS_ACTIVESEEDING:
       newStatusValue = STATUS_ACTIVESEEDING_UNINSTALLED;
       break;
-    case STATUS_DELETING_NOOUTPUT:
-      newStatusValue = STATUS_DELETING;
+    case STATUS_DELETING:
+      newStatusValue = STATUS_DELETING_NOOUTPUT;
       break;
     default:
       newStatusValue = oldStatusValue;
@@ -1368,12 +1418,38 @@ public class Jobs extends org.apache.man
 
     HashMap newValues = new HashMap();
     newValues.put(statusField,statusToString(newStatusValue));
+    newValues.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(idField,jobID)});
     performUpdate(newValues,"WHERE "+query,list,invKey);
   }
+  
+  /** Signal to a job that its underlying output connector has returned.
+  *@param jobID is the identifier of the job.
+  *@param oldStatusValue is the current status value for the job.
+  */
+  public void noteOutputConnectorRegistration(Long jobID, int oldStatusValue)
+    throws ManifoldCFException
+  {
+    if (oldStatusValue == STATUS_DELETING_NOOUTPUT)
+    {
+      // We can do the state transition now.
+      StringSet invKey = new StringSet(getJobStatusKey());
 
+      HashMap newValues = new HashMap();
+      newValues.put(statusField,statusToString(STATUS_DELETING));
+      newValues.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
+      ArrayList list = new ArrayList();
+      String query = buildConjunctionClause(list,new ClauseDescription[]{
+        new UnitaryClause(idField,jobID)});
+      performUpdate(newValues,"WHERE "+query,list,invKey);
+      return;
+    }
+    // Otherwise, we don't know the state, and can't do the work now because we'd deadlock
+    invalidateCurrentUnregisteredState(jobID,oldStatusValue);
+  }
+  
   /** Signal to a job that its underlying connector has gone away.
   *@param jobID is the identifier of the job.
   *@param oldStatusValue is the current status value for the job.
@@ -1396,12 +1472,6 @@ public class Jobs extends org.apache.man
     case STATUS_ACTIVESEEDING:
       newStatusValue = STATUS_ACTIVESEEDING_UNINSTALLED;
       break;
-    case STATUS_ACTIVE_NOOUTPUT:
-      newStatusValue = STATUS_ACTIVE_NEITHER;
-      break;
-    case STATUS_ACTIVESEEDING_NOOUTPUT:
-      newStatusValue = STATUS_ACTIVESEEDING_NEITHER;
-      break;
     default:
       newStatusValue = oldStatusValue;
       break;
@@ -1413,12 +1483,13 @@ public class Jobs extends org.apache.man
 
     HashMap newValues = new HashMap();
     newValues.put(statusField,statusToString(newStatusValue));
+    newValues.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(idField,jobID)});
     performUpdate(newValues,"WHERE "+query,list,invKey);
   }
-
+  
   /** Signal to a job that its underlying connector has returned.
   *@param jobID is the identifier of the job.
   *@param oldStatusValue is the current status value for the job.
@@ -1426,40 +1497,9 @@ public class Jobs extends org.apache.man
   public void noteConnectorRegistration(Long jobID, int oldStatusValue)
     throws ManifoldCFException
   {
-    int newStatusValue;
-    // The following states are special, in that when the underlying connector returns, the jobs
-    // in such states are switched back to their connector-installed value.
-    switch (oldStatusValue)
-    {
-    case STATUS_ACTIVE_UNINSTALLED:
-      newStatusValue = STATUS_ACTIVE;
-      break;
-    case STATUS_ACTIVESEEDING_UNINSTALLED:
-      newStatusValue = STATUS_ACTIVESEEDING;
-      break;
-    case STATUS_ACTIVE_NEITHER:
-      newStatusValue = STATUS_ACTIVE_NOOUTPUT;
-      break;
-    case STATUS_ACTIVESEEDING_NEITHER:
-      newStatusValue = STATUS_ACTIVESEEDING_NOOUTPUT;
-      break;
-    default:
-      newStatusValue = oldStatusValue;
-      break;
-    }
-    if (newStatusValue == oldStatusValue)
-      return;
-
-    StringSet invKey = new StringSet(getJobStatusKey());
-
-    HashMap newValues = new HashMap();
-    newValues.put(statusField,statusToString(newStatusValue));
-    ArrayList list = new ArrayList();
-    String query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(idField,jobID)});
-    performUpdate(newValues,"WHERE "+query,list,invKey);
+    invalidateCurrentUnregisteredState(jobID,oldStatusValue);
   }
-
+  
   /** Note a change in connection configuration.
   * This method will be called whenever a connection's configuration is modified, or when an external repository change
   * is signalled.
@@ -1491,6 +1531,22 @@ public class Jobs extends org.apache.man
       new UnitaryClause(outputNameField,connectionName)});
     performUpdate(newValues,"WHERE "+query,list,null);
   }
+
+  /** Note a change in transformation connection configuration.
+  * This method will be called whenever a connection's configuration is modified.
+  */
+  public void noteTransformationConnectionChange(String connectionName)
+    throws ManifoldCFException
+  {
+    // No cache keys need invalidation, since we're changing the start time, not the status.
+    HashMap newValues = new HashMap();
+    newValues.put(lastCheckTimeField,null);
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new JoinClause(getTableName()+"."+idField,pipelineManager.ownerIDField),
+      new UnitaryClause(pipelineManager.transformationNameField,connectionName)});
+    performUpdate(newValues,"WHERE EXISTS(SELECT 'x' FROM "+pipelineManager.getTableName()+" WHERE "+query+")",list,null);
+  }
   
   /** Check whether a job's status indicates that it is in ACTIVE or ACTIVESEEDING state.
   */
@@ -1683,20 +1739,6 @@ public class Jobs extends org.apache.man
     map.put(statusField,statusToString(STATUS_ACTIVE_UNINSTALLED));
     map.put(processIDField,null);
     performUpdate(map,"WHERE "+query,list,invKey);
-    list.clear();
-    query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NOOUTPUT)),
-      new UnitaryClause(processIDField,processID)});
-    map.put(statusField,statusToString(STATUS_ACTIVE_NOOUTPUT));
-    map.put(processIDField,null);
-    performUpdate(map,"WHERE "+query,list,invKey);
-    list.clear();
-    query = buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(statusField,statusToString(STATUS_ACTIVESEEDING_NEITHER)),
-      new UnitaryClause(processIDField,processID)});
-    map.put(statusField,statusToString(STATUS_ACTIVE_NEITHER));
-    map.put(processIDField,null);
-    performUpdate(map,"WHERE "+query,list,invKey);
 
   }
 
@@ -1769,12 +1811,6 @@ public class Jobs extends org.apache.man
       case STATUS_ACTIVESEEDING_UNINSTALLED:
         newStatus = STATUS_ACTIVE_UNINSTALLED;
         break;
-      case STATUS_ACTIVESEEDING_NOOUTPUT:
-        newStatus = STATUS_ACTIVE_NOOUTPUT;
-        break;
-      case STATUS_ACTIVESEEDING_NEITHER:
-        newStatus = STATUS_ACTIVE_NEITHER;
-        break;
       case STATUS_ACTIVEWAITSEEDING:
         newStatus = STATUS_ACTIVEWAIT;
         break;
@@ -1873,6 +1909,67 @@ public class Jobs extends org.apache.man
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }
 
+  /** Assess all marked jobs to determine if they can be reactivated.
+  */
+  public void assessMarkedJobs()
+    throws ManifoldCFException
+  {
+    ArrayList newList = new ArrayList();
+    String query = buildConjunctionClause(newList,new ClauseDescription[]{
+      new UnitaryClause(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN))});
+    // Query for the matching jobs, and then for each job potentially adjust the state based on the connector status
+    IResultSet set = performQuery("SELECT "+idField+","+statusField+","+connectionNameField+","+outputNameField+" FROM "+
+      getTableName()+" WHERE "+query+" FOR UPDATE",
+      newList,null,null);
+    for (int i = 0; i < set.getRowCount(); i++)
+    {
+      IResultRow row = set.getRow(i);
+      Long jobID = (Long)row.getValue(idField);
+      String outputName = (String)row.getValue(outputNameField);
+      String connectionName = (String)row.getValue(connectionNameField);
+      String[] transformationNames = pipelineManager.getConnectionNames(jobID);
+      int statusValue = stringToStatus((String)row.getValue(statusField));
+      int newValue;
+      
+      // Based on status value, see what we need to know to determine if the state can be switched
+      switch (statusValue)
+      {
+      case STATUS_DELETING_NOOUTPUT:
+        if (outputMgr.checkConnectorExists(outputName))
+          newValue = STATUS_DELETING;
+        else
+          return;
+        break;
+      case STATUS_ACTIVE_UNINSTALLED:
+        if (outputMgr.checkConnectorExists(outputName) &&
+          connectionMgr.checkConnectorExists(connectionName) &&
+          checkTransformationsInstalled(transformationNames))
+          newValue = STATUS_ACTIVE;
+        else
+          return;
+        break;
+      case STATUS_ACTIVESEEDING_UNINSTALLED:
+        if (outputMgr.checkConnectorExists(outputName) &&
+          connectionMgr.checkConnectorExists(connectionName) &&
+          checkTransformationsInstalled(transformationNames))
+          newValue = STATUS_ACTIVESEEDING;
+        else
+          return;
+        break;
+      default:
+        return;
+      }
+      
+      ArrayList list = new ArrayList();
+      query = buildConjunctionClause(list,new ClauseDescription[]{
+        new UnitaryClause(idField,jobID)});
+      HashMap map = new HashMap();
+      map.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
+      performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
+    }
+
+  }
+  
   /** Put job back into active state, from the shutting-down state.
   *@param jobID is the job identifier.
   */
@@ -1889,26 +1986,24 @@ public class Jobs extends org.apache.man
         query+" FOR UPDATE",list,null,null);
       if (set.getRowCount() == 0)
         throw new ManifoldCFException("Can't find job "+jobID.toString());
+      
       IResultRow row = set.getRow(0);
+      
       int status = stringToStatus((String)row.getValue(statusField));
       int newStatus;
       switch (status)
       {
       case STATUS_SHUTTINGDOWN:
-        if (connectionMgr.checkConnectorExists((String)row.getValue(connectionNameField)))
-        {
-          if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
-            newStatus = STATUS_ACTIVE;
-          else
-            newStatus = STATUS_ACTIVE_NOOUTPUT;
-        }
+        String[] transformationConnectionNames = pipelineManager.getConnectionNames(jobID);
+        String connectionName = (String)row.getValue(connectionNameField);
+        String outputName = (String)row.getValue(outputNameField);
+        // Want either STATUS_ACTIVE, or STATUS_ACTIVE_UNINSTALLED
+        if (!checkTransformationsInstalled(transformationConnectionNames) ||
+          !connectionMgr.checkConnectorExists(connectionName) ||
+          !outputMgr.checkConnectorExists(outputName))
+          newStatus = STATUS_ACTIVE_UNINSTALLED;
         else
-        {
-          if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
-            newStatus = STATUS_ACTIVE_UNINSTALLED;
-          else
-            newStatus = STATUS_ACTIVE_NEITHER;
-        }
+          newStatus = STATUS_ACTIVE;
         break;
       default:
         // Complain!
@@ -1940,6 +2035,17 @@ public class Jobs extends org.apache.man
       endTransaction();
     }
   }
+
+  protected boolean checkTransformationsInstalled(String[] transformationNames)
+    throws ManifoldCFException
+  {
+    for (String transformationName : transformationNames)
+    {
+      if (!transMgr.checkConnectorExists(transformationName))
+        return false;
+    }
+    return true;
+  }
   
   /** Put job into "deleting" state, and set the start time field.
   *@param jobID is the job identifier.
@@ -2022,26 +2128,24 @@ public class Jobs extends org.apache.man
       if (set.getRowCount() == 0)
         throw new ManifoldCFException("Can't find job "+jobID.toString());
       IResultRow row = set.getRow(0);
+      
       int status = stringToStatus((String)row.getValue(statusField));
       int newStatus;
       switch (status)
       {
       case STATUS_STARTINGUP:
       case STATUS_STARTINGUPMINIMAL:
-        if (connectionMgr.checkConnectorExists((String)row.getValue(connectionNameField)))
-        {
-          if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
-            newStatus = STATUS_ACTIVE;
-          else
-            newStatus = STATUS_ACTIVE_NOOUTPUT;
-        }
+        String[] transformationConnectionNames = pipelineManager.getConnectionNames(jobID);
+        String connectionName = (String)row.getValue(connectionNameField);
+        String outputName = (String)row.getValue(outputNameField);
+
+        // Need to set either STATUS_ACTIVE, or STATUS_ACTIVE_UNINSTALLED
+        if (!checkTransformationsInstalled(transformationConnectionNames) ||
+          !connectionMgr.checkConnectorExists(connectionName) ||
+          !outputMgr.checkConnectorExists(outputName))
+          newStatus = STATUS_ACTIVE_UNINSTALLED;
         else
-        {
-          if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
-            newStatus = STATUS_ACTIVE_UNINSTALLED;
-          else
-            newStatus = STATUS_ACTIVE_NEITHER;
-        }
+          newStatus = STATUS_ACTIVE;
         break;
       case STATUS_ABORTINGSTARTINGUPFORRESTART:
         newStatus = STATUS_ABORTINGFORRESTART;
@@ -2056,8 +2160,7 @@ public class Jobs extends org.apache.man
 
       HashMap map = new HashMap();
       map.put(statusField,statusToString(newStatus));
-      if (newStatus == STATUS_ACTIVE || newStatus == STATUS_ACTIVE_UNINSTALLED ||
-        newStatus == STATUS_ACTIVE_NOOUTPUT || newStatus == STATUS_ACTIVE_NEITHER)
+      if (newStatus == STATUS_ACTIVE || newStatus == STATUS_ACTIVE_UNINSTALLED)
       {
         map.put(startTimeField,new Long(startTime));
       }
@@ -2128,12 +2231,6 @@ public class Jobs extends org.apache.man
       case STATUS_ACTIVESEEDING_UNINSTALLED:
         newStatus = STATUS_ACTIVE_UNINSTALLED;
         break;
-      case STATUS_ACTIVESEEDING_NOOUTPUT:
-        newStatus = STATUS_ACTIVE_NOOUTPUT;
-        break;
-      case STATUS_ACTIVESEEDING_NEITHER:
-        newStatus = STATUS_ACTIVE_NEITHER;
-        break;
       case STATUS_ACTIVEWAITSEEDING:
         newStatus = STATUS_ACTIVEWAIT;
         break;
@@ -2212,6 +2309,7 @@ public class Jobs extends org.apache.man
       new UnitaryClause(idField,jobID)});
     HashMap map = new HashMap();
     map.put(statusField,statusToString(newStatus));
+    map.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
     map.put(windowEndField,null);
     performUpdate(map,"WHERE "+query,list,new StringSet(getJobStatusKey()));
   }
@@ -2258,8 +2356,6 @@ public class Jobs extends org.apache.man
     case STATUS_READYFORSTARTUPMINIMAL:
     case STATUS_ACTIVE:
     case STATUS_ACTIVE_UNINSTALLED:
-    case STATUS_ACTIVE_NOOUTPUT:
-    case STATUS_ACTIVE_NEITHER:
     case STATUS_ACTIVEWAIT:
     case STATUS_PAUSING:
     case STATUS_ACTIVEWAITING:
@@ -2272,8 +2368,6 @@ public class Jobs extends org.apache.man
       break;
     case STATUS_ACTIVESEEDING:
     case STATUS_ACTIVESEEDING_UNINSTALLED:
-    case STATUS_ACTIVESEEDING_NOOUTPUT:
-    case STATUS_ACTIVESEEDING_NEITHER:
     case STATUS_ACTIVEWAITSEEDING:
     case STATUS_PAUSINGSEEDING:
     case STATUS_ACTIVEWAITINGSEEDING:
@@ -2331,8 +2425,6 @@ public class Jobs extends org.apache.man
     case STATUS_READYFORSTARTUPMINIMAL:
     case STATUS_ACTIVE:
     case STATUS_ACTIVE_UNINSTALLED:
-    case STATUS_ACTIVE_NOOUTPUT:
-    case STATUS_ACTIVE_NEITHER:
     case STATUS_ACTIVEWAIT:
     case STATUS_PAUSING:
     case STATUS_ACTIVEWAITING:
@@ -2343,8 +2435,6 @@ public class Jobs extends org.apache.man
       break;
     case STATUS_ACTIVESEEDING:
     case STATUS_ACTIVESEEDING_UNINSTALLED:
-    case STATUS_ACTIVESEEDING_NOOUTPUT:
-    case STATUS_ACTIVESEEDING_NEITHER:
     case STATUS_ACTIVEWAITSEEDING:
     case STATUS_PAUSINGSEEDING:
     case STATUS_ACTIVEWAITINGSEEDING:
@@ -2383,8 +2473,6 @@ public class Jobs extends org.apache.man
     {
     case STATUS_ACTIVE:
     case STATUS_ACTIVE_UNINSTALLED:
-    case STATUS_ACTIVE_NOOUTPUT:
-    case STATUS_ACTIVE_NEITHER:
       newStatus = STATUS_PAUSING;
       break;
     case STATUS_ACTIVEWAITING:
@@ -2395,8 +2483,6 @@ public class Jobs extends org.apache.man
       break;
     case STATUS_ACTIVESEEDING:
     case STATUS_ACTIVESEEDING_UNINSTALLED:
-    case STATUS_ACTIVESEEDING_NOOUTPUT:
-    case STATUS_ACTIVESEEDING_NEITHER:
       newStatus = STATUS_PAUSINGSEEDING;
       break;
     case STATUS_ACTIVEWAITINGSEEDING:
@@ -2618,6 +2704,7 @@ public class Jobs extends org.apache.man
         throw new ManifoldCFException("Job does not exist: "+jobID);
       IResultRow row = set.getRow(0);
       int status = stringToStatus(row.getValue(statusField).toString());
+      String[] transformationConnectionNames = pipelineManager.getConnectionNames(jobID);
       String connectionName = (String)row.getValue(connectionNameField);
       String outputName = (String)row.getValue(outputNameField);
       int newStatus;
@@ -2625,37 +2712,23 @@ public class Jobs extends org.apache.man
       switch (status)
       {
       case STATUS_RESUMING:
-        if (connectionMgr.checkConnectorExists(connectionName))
-        {
-          if (outputMgr.checkConnectorExists(outputName))
-            newStatus = STATUS_ACTIVE;
-          else
-            newStatus = STATUS_ACTIVE_NOOUTPUT;
-        }
+        // Want either STATUS_ACTIVE or STATUS_ACTIVE_UNINSTALLED
+        if (!checkTransformationsInstalled(transformationConnectionNames) ||
+          !connectionMgr.checkConnectorExists(connectionName) ||
+          !outputMgr.checkConnectorExists(outputName))
+          newStatus = STATUS_ACTIVE_UNINSTALLED;
         else
-        {
-          if (outputMgr.checkConnectorExists(outputName))
-            newStatus = STATUS_ACTIVE_UNINSTALLED;
-          else
-            newStatus = STATUS_ACTIVE_NEITHER;
-        }
+          newStatus = STATUS_ACTIVE;
         map.put(statusField,statusToString(newStatus));
         break;
       case STATUS_RESUMINGSEEDING:
-        if (connectionMgr.checkConnectorExists(connectionName))
-        {
-          if (outputMgr.checkConnectorExists(outputName))
-            newStatus = STATUS_ACTIVESEEDING;
-          else
-            newStatus = STATUS_ACTIVESEEDING_NOOUTPUT;
-        }
+        // Want either STATUS_ACTIVESEEDING or STATUS_ACTIVESEEDING_UNINSTALLED
+        if (!checkTransformationsInstalled(transformationConnectionNames) ||
+          !connectionMgr.checkConnectorExists(connectionName) ||
+          !outputMgr.checkConnectorExists(outputName))
+          newStatus = STATUS_ACTIVESEEDING_UNINSTALLED;
         else
-        {
-          if (outputMgr.checkConnectorExists(outputName))
-            newStatus = STATUS_ACTIVESEEDING_UNINSTALLED;
-          else
-            newStatus = STATUS_ACTIVESEEDING_NEITHER;
-        }
+          newStatus = STATUS_ACTIVESEEDING;
         map.put(statusField,statusToString(newStatus));
         break;
       default:
@@ -2810,6 +2883,21 @@ public class Jobs extends org.apache.man
     return set.getRowCount() > 0;
   }
 
+  /** See if there's a reference to a transformation connection name.
+  *@param connectionName is the name of the connection.
+  *@return true if there is a reference, false otherwise.
+  */
+  public boolean checkIfTransformationReference(String connectionName)
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(pipelineManager.transformationNameField,connectionName)});
+    IResultSet set = performQuery("SELECT "+pipelineManager.ownerIDField+" FROM "+pipelineManager.getTableName()+
+      " WHERE "+query,list,new StringSet(getJobsKey()),null);
+    return set.getRowCount() > 0;
+  }
+
   /** Get the job IDs associated with a given connection name.
   *@param connectionName is the name of the connection.
   *@return the set of job id's associated with that connection.
@@ -3063,6 +3151,35 @@ public class Jobs extends org.apache.man
     }
   }
 
+  /** Go from string to assessment state.
+  */
+  public static int stringToAssessmentState(String value)
+    throws ManifoldCFException
+  {
+    if (value == null || value.length() == 0)
+      return ASSESSMENT_KNOWN;
+    Integer x = assessmentMap.get(value);
+    if (x == null)
+      throw new ManifoldCFException("Bad assessment value: '"+value+"'");
+    return x.intValue();
+  }
+  
+  /** Go from assessment state to string.
+  */
+  public static String assessmentStateToString(int value)
+    throws ManifoldCFException
+  {
+    switch(value)
+    {
+    case ASSESSMENT_KNOWN:
+      return "Y";
+    case ASSESSMENT_UNKNOWN:
+      return "N";
+    default:
+      throw new ManifoldCFException("Unknown assessment state value "+Integer.toString(value));
+    }
+  }
+
   /** Go from string to hopcount mode.
   */
   public static int stringToHopcountMode(String value)
@@ -3070,7 +3187,7 @@ public class Jobs extends org.apache.man
   {
     if (value == null || value.length() == 0)
       return HOPCOUNT_ACCURATE;
-    Integer x = (Integer)hopmodeMap.get(value);
+    Integer x = hopmodeMap.get(value);
     if (x == null)
       throw new ManifoldCFException("Bad hopcount mode value: '"+value+"'");
     return x.intValue();
@@ -3214,14 +3331,12 @@ public class Jobs extends org.apache.man
     throws ManifoldCFException
   {
     // Fetch all the jobs, but only once for each ID.  Then, assign each one by id into the final array.
-    HashMap uniqueIDs = new HashMap();
-    int i = 0;
-    while (i < ids.length)
+    Set<Long> uniqueIDs = new HashSet<Long>();
+    for (Long id : ids)
     {
-      uniqueIDs.put(ids[i],ids[i]);
-      i++;
+      uniqueIDs.add(id);
     }
-    HashMap returnValues = new HashMap();
+    Map<Long,JobDescription> returnValues = new HashMap<Long,JobDescription>();
     beginTransaction();
     try
     {
@@ -3229,8 +3344,7 @@ public class Jobs extends org.apache.man
       ArrayList params = new ArrayList();
       int j = 0;
       int maxIn = getMaxInClause();
-      Iterator iter = uniqueIDs.keySet().iterator();
-      while (iter.hasNext())
+      for (Long uniqueID : uniqueIDs)
       {
         if (j == maxIn)
         {
@@ -3242,7 +3356,7 @@ public class Jobs extends org.apache.man
         if (j > 0)
           sb.append(',');
         sb.append('?');
-        params.add((Long)iter.next());
+        params.add(uniqueID);
         j++;
       }
       if (j > 0)
@@ -3265,15 +3379,13 @@ public class Jobs extends org.apache.man
 
     // Build the return array
     JobDescription[] rval = new JobDescription[ids.length];
-    i = 0;
-    while (i < rval.length)
+    for (int i = 0; i < rval.length; i++)
     {
       Long id = ids[i];
-      JobDescription jd = (JobDescription)returnValues.get(id);
+      JobDescription jd = returnValues.get(id);
       if (jd != null)
         jd.makeReadOnly();
       rval[i] = jd;
-      i++;
     }
     return rval;
   }
@@ -3283,7 +3395,7 @@ public class Jobs extends org.apache.man
   *@param idList is the list of id's.
   *@param params is the set of parameters.
   */
-  protected void getJobsChunk(Map returnValues, String idList, ArrayList params)
+  protected void getJobsChunk(Map<Long,JobDescription> returnValues, String idList, ArrayList params)
     throws ManifoldCFException
   {
     try
@@ -3321,6 +3433,7 @@ public class Jobs extends org.apache.man
       }
 
       // Fill in schedules for jobs
+      pipelineManager.getRows(returnValues,idList,params);
       scheduleManager.getRows(returnValues,idList,params);
       hopFilterManager.getRows(returnValues,idList,params);
       forcedParamManager.getRows(returnValues,idList,params);

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ScheduleManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ScheduleManager.java?rev=1601529&r1=1601528&r2=1601529&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ScheduleManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ScheduleManager.java Mon Jun  9 23:19:08 2014
@@ -164,7 +164,7 @@ public class ScheduleManager extends org
   *@param ownerIDList is the list of owner id's.
   *@param ownerIDParams is the corresponding set of owner id parameters.
   */
-  public void getRows(Map returnValues, String ownerIDList, ArrayList ownerIDParams)
+  public void getRows(Map<Long,JobDescription> returnValues, String ownerIDList, ArrayList ownerIDParams)
     throws ManifoldCFException
   {
     IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+ownerIDField+" IN ("+ownerIDList+") ORDER BY "+ordinalField+" ASC",ownerIDParams,
@@ -183,7 +183,7 @@ public class ScheduleManager extends org
         (String)row.getValue(timezoneField),
         (Long)row.getValue(windowDurationField),
         stringToRequestMinimumValue((String)row.getValue(requestMinimumField)));
-      ((JobDescription)returnValues.get(ownerID)).addScheduleRecord(sr);
+      returnValues.get(ownerID).addScheduleRecord(sr);
       i++;
     }
   }