You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/12/20 14:39:45 UTC

svn commit: r1646978 - in /manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs: JobManager.java Jobs.java

Author: kwright
Date: Sat Dec 20 13:39:44 2014
New Revision: 1646978

URL: http://svn.apache.org/r1646978
Log:
ok, notification table set up for jobs

Modified:
    manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java

Modified: manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1646978&r1=1646977&r2=1646978&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Sat Dec 20 13:39:44 2014
@@ -426,13 +426,20 @@ public class JobManager implements IJobM
   protected void noteNotificationConnectionDeregistration(List<String> list)
     throws ManifoldCFException
   {
-    ArrayList newList = new ArrayList();
-    String query = database.buildConjunctionClause(newList,new ClauseDescription[]{
-      new MultiClause(jobs.notificationNameField,list)});
     // Query for the matching jobs, and then for each job potentially adjust the state
-    IResultSet set = database.performQuery("SELECT "+jobs.idField+","+jobs.statusField+" FROM "+
-      jobs.getTableName()+" WHERE "+query+" FOR UPDATE",
-      newList,null,null);
+    Long[] jobIDs = jobs.findJobsMatchingNotifications(list);
+    if (jobIDs.length == 0)
+      return;
+
+    StringBuilder query = new StringBuilder();
+    ArrayList newList = new ArrayList();
+    
+    query.append("SELECT ").append(jobs.idField).append(",").append(jobs.statusField)
+      .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+      .append(database.buildConjunctionClause(newList,new ClauseDescription[]{
+        new MultiClause(jobs.idField,jobIDs)}))
+      .append(" FOR UPDATE");
+    IResultSet set = database.performQuery(query.toString(),newList,null,null);
     int i = 0;
     while (i < set.getRowCount())
     {
@@ -479,19 +486,26 @@ public class JobManager implements IJobM
     throws ManifoldCFException
   {
     // Query for the matching jobs, and then for each job potentially adjust the state
+    Long[] jobIDs = jobs.findJobsMatchingNotifications(list);
+    if (jobIDs.length == 0)
+      return;
+
+    StringBuilder query = new StringBuilder();
     ArrayList newList = new ArrayList();
-    String query = database.buildConjunctionClause(newList,new ClauseDescription[]{
-      new MultiClause(jobs.notificationNameField,list)});
-    IResultSet set = database.performQuery("SELECT "+jobs.idField+","+jobs.statusField+" FROM "+
-      jobs.getTableName()+" WHERE "+query+" FOR UPDATE",
-      newList,null,null);
+    
+    query.append("SELECT ").append(jobs.idField).append(",").append(jobs.statusField)
+      .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+      .append(database.buildConjunctionClause(newList,new ClauseDescription[]{
+        new MultiClause(jobs.idField,jobIDs)}))
+      .append(" FOR UPDATE");
+    IResultSet set = database.performQuery(query.toString(),newList,null,null);
     int i = 0;
     while (i < set.getRowCount())
     {
       IResultRow row = set.getRow(i++);
       Long jobID = (Long)row.getValue(jobs.idField);
       int statusValue = jobs.stringToStatus((String)row.getValue(jobs.statusField));
-      jobs.noteNotificationConnectorRegistration(jobID,statusValue);
+      jobs.noteNotificationConnectorDeregistration(jobID,statusValue);
     }
   }
 

Modified: manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1646978&r1=1646977&r2=1646978&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Sat Dec 20 13:39:44 2014
@@ -42,7 +42,6 @@ import java.util.*;
  * <tr><td>endtime</td><td>BIGINT</td><td>operational field</td></tr>
  * <tr><td>docspec</td><td>LONGTEXT</td><td></td></tr>
  * <tr><td>connectionname</td><td>VARCHAR(32)</td><td>Reference:repoconnections.connectionname</td></tr>
- * <tr><td>notificationname</td><td>VARCHAR(32)</td><td>Reference:notificationconnections.connectionname</td></tr>
  * <tr><td>type</td><td>CHAR(1)</td><td></td></tr>
  * <tr><td>intervaltime</td><td>BIGINT</td><td></td></tr>
  * <tr><td>maxintervaltime</td><td>BIGINT</td><td></td></tr>
@@ -158,7 +157,6 @@ public class Jobs extends org.apache.man
   public final static String descriptionField = "description";
   public final static String documentSpecField = "docspec";
   public final static String connectionNameField = "connectionname";
-  public final static String notificationNameField = "notificationname";
   public final static String typeField = "type";
   /** This is the minimum reschedule interval for a document being crawled adaptively (in ms.) */
   public final static String intervalField = "intervaltime";
@@ -347,6 +345,7 @@ public class Jobs extends org.apache.man
   protected final ScheduleManager scheduleManager;
   protected final HopFilterManager hopFilterManager;
   protected final PipelineManager pipelineManager;
+  protected final NotificationManager notificationManager;
   
   protected final IOutputConnectionManager outputMgr;
   protected final IRepositoryConnectionManager connectionMgr;
@@ -368,6 +367,7 @@ public class Jobs extends org.apache.man
     scheduleManager = new ScheduleManager(threadContext,database);
     hopFilterManager = new HopFilterManager(threadContext,database);
     pipelineManager = new PipelineManager(threadContext,database);
+    notificationManager = new NotificationManager(threadContext,database);
     
     cacheManager = CacheManagerFactory.make(threadContext);
     lockManager = LockManagerFactory.make(threadContext);
@@ -409,7 +409,6 @@ public class Jobs extends org.apache.man
         map.put(endTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
         map.put(documentSpecField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
         map.put(this.connectionNameField,new ColumnDescription("VARCHAR(32)",false,false,connectionTableName,connectionNameField,false));
-        map.put(this.notificationNameField,new ColumnDescription("VARCHAR(32)",false,true,notificationConnectionTableName,notificationConnectionNameField,false));
         map.put(typeField,new ColumnDescription("CHAR(1)",false,false,null,null,false));
         map.put(intervalField,new ColumnDescription("BIGINT",false,true,null,null,false));
         map.put(maxIntervalField,new ColumnDescription("BIGINT",false,true,null,null,false));
@@ -430,12 +429,6 @@ public class Jobs extends org.apache.man
       else
       {
         // Do any needed upgrades
-        if (existing.get(notificationNameField) == null)
-        {
-          HashMap map = new HashMap();
-          map.put(this.notificationNameField,new ColumnDescription("VARCHAR(32)",false,true,notificationConnectionTableName,notificationConnectionNameField,false));
-          performAlter(map,null,null,null);
-        }
       }
 
       // Handle related tables
@@ -452,6 +445,7 @@ public class Jobs extends org.apache.man
           pipelineManager.writeOutputStage(id,outputConnectionName,outputConnectionSpec);
         }
       }
+      notificationManager.install(getTableName(),idField,notificationConnectionTableName,notificationConnectionNameField);
       scheduleManager.install(getTableName(),idField);
       hopFilterManager.install(getTableName(),idField);
 
@@ -459,7 +453,6 @@ public class Jobs extends org.apache.man
       IndexDescription statusIndex = new IndexDescription(false,new String[]{statusField,idField,priorityField});
       IndexDescription statusProcessIndex = new IndexDescription(false,new String[]{statusField,processIDField});
       IndexDescription connectionIndex = new IndexDescription(false,new String[]{connectionNameField});
-      IndexDescription notificationIndex = new IndexDescription(false,new String[]{notificationNameField});
       IndexDescription failTimeIndex = new IndexDescription(false,new String[]{failTimeField});
 
       // Get rid of indexes that shouldn't be there
@@ -476,8 +469,6 @@ public class Jobs extends org.apache.man
           statusProcessIndex = null;
         else if (connectionIndex != null && id.equals(connectionIndex))
           connectionIndex = null;
-        else if (notificationIndex != null && id.equals(notificationIndex))
-          notificationIndex = null;
         else if (failTimeIndex != null && id.equals(failTimeIndex))
           failTimeIndex = null;
         else if (indexName.indexOf("_pkey") == -1)
@@ -492,8 +483,6 @@ public class Jobs extends org.apache.man
         performAddIndex(null,statusProcessIndex);
       if (connectionIndex != null)
         performAddIndex(null,connectionIndex);
-      if (notificationIndex != null)
-        performAddIndex(null,notificationIndex);
       if (failTimeIndex != null)
         performAddIndex(null,failTimeIndex);
 
@@ -512,6 +501,7 @@ public class Jobs extends org.apache.man
     {
       hopFilterManager.deinstall();
       scheduleManager.deinstall();
+      notificationManager.deinstall();
       pipelineManager.deinstall();
       performDrop(null);
     }
@@ -549,6 +539,27 @@ public class Jobs extends org.apache.man
     analyzeTable();
   }
 
+  /** Find a list of jobs matching specified notification names.
+  */
+  public Long[] findJobsMatchingNotifications(List<String> notificationConnectionNames)
+    throws ManifoldCFException
+  {
+    StringBuilder query = new StringBuilder();
+    ArrayList params = new ArrayList();
+    query.append("SELECT ").append(idField)
+      .append(" FROM ").append(getTableName()).append(" t1 WHERE EXISTS(");
+    notificationManager.buildNotificationQueryClause(query,params,"t1."+idField,notificationConnectionNames);
+    query.append(")");
+    IResultSet set = performQuery(query.toString(),params,null,null);
+    Long[] rval = new Long[set.getRowCount()];
+    for (int i = 0; i < rval.length; i++)
+    {
+      IResultRow row = set.getRow(i);
+      rval[i] = (Long)row.getValue(idField);
+    }
+    return rval;
+  }
+
   /** Find a list of jobs matching specified transformation names.
   */
   public Long[] findJobsMatchingTransformations(List<String> transformationConnectionNames)
@@ -795,6 +806,7 @@ public class Jobs extends org.apache.man
         scheduleManager.deleteRows(id);
         hopFilterManager.deleteRows(id);
         pipelineManager.deleteRows(id);
+        notificationManager.deleteRows(id);
         ArrayList params = new ArrayList();
         String query = buildConjunctionClause(params,new ClauseDescription[]{
           new UnitaryClause(idField,id)});
@@ -948,6 +960,14 @@ public class Jobs extends org.apache.man
                     values.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
                 }
 
+                // Changing notifications should never reset seeding.
+                /*
+                if (isSame)
+                {
+                  isSame = notificationManager.compareRows(id,jobDescription);
+                }
+                */
+                
                 if (isSame)
                 {
                   String oldDocSpecXML = (String)row.getValue(documentSpecField);
@@ -966,6 +986,7 @@ public class Jobs extends org.apache.man
                   new UnitaryClause(idField,id)});
                 performUpdate(values," WHERE "+query,params,null);
                 pipelineManager.deleteRows(id);
+                notificationManager.deleteRows(id);
                 scheduleManager.deleteRows(id);
                 hopFilterManager.deleteRows(id);
               }
@@ -983,6 +1004,8 @@ public class Jobs extends org.apache.man
 
               // Write pipeline rows
               pipelineManager.writeRows(id,jobDescription);
+              // Write notification rows
+              notificationManager.writeRows(id,jobDescription);
               // Write schedule records
               scheduleManager.writeRows(id,jobDescription);
               // Write hop filter rows
@@ -3014,8 +3037,12 @@ public class Jobs extends org.apache.man
   public boolean checkIfNotificationReference(String connectionName)
     throws ManifoldCFException
   {
-    // MHL
-    return false;
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(notificationManager.notificationNameField,connectionName)});
+    IResultSet set = performQuery("SELECT "+notificationManager.ownerIDField+" FROM "+notificationManager.getTableName()+
+      " WHERE "+query,list,new StringSet(getJobsKey()),null);
+    return set.getRowCount() > 0;
   }
 
   /** See if there's a reference to an output connection name.
@@ -3575,6 +3602,7 @@ public class Jobs extends org.apache.man
 
       // Fill in schedules for jobs
       pipelineManager.getRows(returnValues,idList,params);
+      notificationManager.getRows(returnValues,idList,params);
       scheduleManager.getRows(returnValues,idList,params);
       hopFilterManager.getRows(returnValues,idList,params);
     }