You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/12/20 14:39:45 UTC
svn commit: r1646978 - in
/manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs:
JobManager.java Jobs.java
Author: kwright
Date: Sat Dec 20 13:39:44 2014
New Revision: 1646978
URL: http://svn.apache.org/r1646978
Log:
ok, notification table set up for jobs
Modified:
manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
Modified: manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1646978&r1=1646977&r2=1646978&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Sat Dec 20 13:39:44 2014
@@ -426,13 +426,20 @@ public class JobManager implements IJobM
protected void noteNotificationConnectionDeregistration(List<String> list)
throws ManifoldCFException
{
- ArrayList newList = new ArrayList();
- String query = database.buildConjunctionClause(newList,new ClauseDescription[]{
- new MultiClause(jobs.notificationNameField,list)});
// Query for the matching jobs, and then for each job potentially adjust the state
- IResultSet set = database.performQuery("SELECT "+jobs.idField+","+jobs.statusField+" FROM "+
- jobs.getTableName()+" WHERE "+query+" FOR UPDATE",
- newList,null,null);
+ Long[] jobIDs = jobs.findJobsMatchingNotifications(list);
+ if (jobIDs.length == 0)
+ return;
+
+ StringBuilder query = new StringBuilder();
+ ArrayList newList = new ArrayList();
+
+ query.append("SELECT ").append(jobs.idField).append(",").append(jobs.statusField)
+ .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+ .append(database.buildConjunctionClause(newList,new ClauseDescription[]{
+ new MultiClause(jobs.idField,jobIDs)}))
+ .append(" FOR UPDATE");
+ IResultSet set = database.performQuery(query.toString(),newList,null,null);
int i = 0;
while (i < set.getRowCount())
{
@@ -479,19 +486,26 @@ public class JobManager implements IJobM
throws ManifoldCFException
{
// Query for the matching jobs, and then for each job potentially adjust the state
+ Long[] jobIDs = jobs.findJobsMatchingNotifications(list);
+ if (jobIDs.length == 0)
+ return;
+
+ StringBuilder query = new StringBuilder();
ArrayList newList = new ArrayList();
- String query = database.buildConjunctionClause(newList,new ClauseDescription[]{
- new MultiClause(jobs.notificationNameField,list)});
- IResultSet set = database.performQuery("SELECT "+jobs.idField+","+jobs.statusField+" FROM "+
- jobs.getTableName()+" WHERE "+query+" FOR UPDATE",
- newList,null,null);
+
+ query.append("SELECT ").append(jobs.idField).append(",").append(jobs.statusField)
+ .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+ .append(database.buildConjunctionClause(newList,new ClauseDescription[]{
+ new MultiClause(jobs.idField,jobIDs)}))
+ .append(" FOR UPDATE");
+ IResultSet set = database.performQuery(query.toString(),newList,null,null);
int i = 0;
while (i < set.getRowCount())
{
IResultRow row = set.getRow(i++);
Long jobID = (Long)row.getValue(jobs.idField);
int statusValue = jobs.stringToStatus((String)row.getValue(jobs.statusField));
- jobs.noteNotificationConnectorRegistration(jobID,statusValue);
+ jobs.noteNotificationConnectorDeregistration(jobID,statusValue);
}
}
Modified: manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1646978&r1=1646977&r2=1646978&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ manifoldcf/branches/CONNECTORS-1119/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Sat Dec 20 13:39:44 2014
@@ -42,7 +42,6 @@ import java.util.*;
* <tr><td>endtime</td><td>BIGINT</td><td>operational field</td></tr>
* <tr><td>docspec</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>connectionname</td><td>VARCHAR(32)</td><td>Reference:repoconnections.connectionname</td></tr>
- * <tr><td>notificationname</td><td>VARCHAR(32)</td><td>Reference:notificationconnections.connectionname</td></tr>
* <tr><td>type</td><td>CHAR(1)</td><td></td></tr>
* <tr><td>intervaltime</td><td>BIGINT</td><td></td></tr>
* <tr><td>maxintervaltime</td><td>BIGINT</td><td></td></tr>
@@ -158,7 +157,6 @@ public class Jobs extends org.apache.man
public final static String descriptionField = "description";
public final static String documentSpecField = "docspec";
public final static String connectionNameField = "connectionname";
- public final static String notificationNameField = "notificationname";
public final static String typeField = "type";
/** This is the minimum reschedule interval for a document being crawled adaptively (in ms.) */
public final static String intervalField = "intervaltime";
@@ -347,6 +345,7 @@ public class Jobs extends org.apache.man
protected final ScheduleManager scheduleManager;
protected final HopFilterManager hopFilterManager;
protected final PipelineManager pipelineManager;
+ protected final NotificationManager notificationManager;
protected final IOutputConnectionManager outputMgr;
protected final IRepositoryConnectionManager connectionMgr;
@@ -368,6 +367,7 @@ public class Jobs extends org.apache.man
scheduleManager = new ScheduleManager(threadContext,database);
hopFilterManager = new HopFilterManager(threadContext,database);
pipelineManager = new PipelineManager(threadContext,database);
+ notificationManager = new NotificationManager(threadContext,database);
cacheManager = CacheManagerFactory.make(threadContext);
lockManager = LockManagerFactory.make(threadContext);
@@ -409,7 +409,6 @@ public class Jobs extends org.apache.man
map.put(endTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
map.put(documentSpecField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(this.connectionNameField,new ColumnDescription("VARCHAR(32)",false,false,connectionTableName,connectionNameField,false));
- map.put(this.notificationNameField,new ColumnDescription("VARCHAR(32)",false,true,notificationConnectionTableName,notificationConnectionNameField,false));
map.put(typeField,new ColumnDescription("CHAR(1)",false,false,null,null,false));
map.put(intervalField,new ColumnDescription("BIGINT",false,true,null,null,false));
map.put(maxIntervalField,new ColumnDescription("BIGINT",false,true,null,null,false));
@@ -430,12 +429,6 @@ public class Jobs extends org.apache.man
else
{
// Do any needed upgrades
- if (existing.get(notificationNameField) == null)
- {
- HashMap map = new HashMap();
- map.put(this.notificationNameField,new ColumnDescription("VARCHAR(32)",false,true,notificationConnectionTableName,notificationConnectionNameField,false));
- performAlter(map,null,null,null);
- }
}
// Handle related tables
@@ -452,6 +445,7 @@ public class Jobs extends org.apache.man
pipelineManager.writeOutputStage(id,outputConnectionName,outputConnectionSpec);
}
}
+ notificationManager.install(getTableName(),idField,notificationConnectionTableName,notificationConnectionNameField);
scheduleManager.install(getTableName(),idField);
hopFilterManager.install(getTableName(),idField);
@@ -459,7 +453,6 @@ public class Jobs extends org.apache.man
IndexDescription statusIndex = new IndexDescription(false,new String[]{statusField,idField,priorityField});
IndexDescription statusProcessIndex = new IndexDescription(false,new String[]{statusField,processIDField});
IndexDescription connectionIndex = new IndexDescription(false,new String[]{connectionNameField});
- IndexDescription notificationIndex = new IndexDescription(false,new String[]{notificationNameField});
IndexDescription failTimeIndex = new IndexDescription(false,new String[]{failTimeField});
// Get rid of indexes that shouldn't be there
@@ -476,8 +469,6 @@ public class Jobs extends org.apache.man
statusProcessIndex = null;
else if (connectionIndex != null && id.equals(connectionIndex))
connectionIndex = null;
- else if (notificationIndex != null && id.equals(notificationIndex))
- notificationIndex = null;
else if (failTimeIndex != null && id.equals(failTimeIndex))
failTimeIndex = null;
else if (indexName.indexOf("_pkey") == -1)
@@ -492,8 +483,6 @@ public class Jobs extends org.apache.man
performAddIndex(null,statusProcessIndex);
if (connectionIndex != null)
performAddIndex(null,connectionIndex);
- if (notificationIndex != null)
- performAddIndex(null,notificationIndex);
if (failTimeIndex != null)
performAddIndex(null,failTimeIndex);
@@ -512,6 +501,7 @@ public class Jobs extends org.apache.man
{
hopFilterManager.deinstall();
scheduleManager.deinstall();
+ notificationManager.deinstall();
pipelineManager.deinstall();
performDrop(null);
}
@@ -549,6 +539,27 @@ public class Jobs extends org.apache.man
analyzeTable();
}
+ /** Find a list of jobs matching specified notification names.
+ */
+ public Long[] findJobsMatchingNotifications(List<String> notificationConnectionNames)
+ throws ManifoldCFException
+ {
+ StringBuilder query = new StringBuilder();
+ ArrayList params = new ArrayList();
+ query.append("SELECT ").append(idField)
+ .append(" FROM ").append(getTableName()).append(" t1 WHERE EXISTS(");
+ notificationManager.buildNotificationQueryClause(query,params,"t1."+idField,notificationConnectionNames);
+ query.append(")");
+ IResultSet set = performQuery(query.toString(),params,null,null);
+ Long[] rval = new Long[set.getRowCount()];
+ for (int i = 0; i < rval.length; i++)
+ {
+ IResultRow row = set.getRow(i);
+ rval[i] = (Long)row.getValue(idField);
+ }
+ return rval;
+ }
+
/** Find a list of jobs matching specified transformation names.
*/
public Long[] findJobsMatchingTransformations(List<String> transformationConnectionNames)
@@ -795,6 +806,7 @@ public class Jobs extends org.apache.man
scheduleManager.deleteRows(id);
hopFilterManager.deleteRows(id);
pipelineManager.deleteRows(id);
+ notificationManager.deleteRows(id);
ArrayList params = new ArrayList();
String query = buildConjunctionClause(params,new ClauseDescription[]{
new UnitaryClause(idField,id)});
@@ -948,6 +960,14 @@ public class Jobs extends org.apache.man
values.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
}
+ // Changing notifications should never reset seeding.
+ /*
+ if (isSame)
+ {
+ isSame = notificationManager.compareRows(id,jobDescription);
+ }
+ */
+
if (isSame)
{
String oldDocSpecXML = (String)row.getValue(documentSpecField);
@@ -966,6 +986,7 @@ public class Jobs extends org.apache.man
new UnitaryClause(idField,id)});
performUpdate(values," WHERE "+query,params,null);
pipelineManager.deleteRows(id);
+ notificationManager.deleteRows(id);
scheduleManager.deleteRows(id);
hopFilterManager.deleteRows(id);
}
@@ -983,6 +1004,8 @@ public class Jobs extends org.apache.man
// Write pipeline rows
pipelineManager.writeRows(id,jobDescription);
+ // Write notification rows
+ notificationManager.writeRows(id,jobDescription);
// Write schedule records
scheduleManager.writeRows(id,jobDescription);
// Write hop filter rows
@@ -3014,8 +3037,12 @@ public class Jobs extends org.apache.man
public boolean checkIfNotificationReference(String connectionName)
throws ManifoldCFException
{
- // MHL
- return false;
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(notificationManager.notificationNameField,connectionName)});
+ IResultSet set = performQuery("SELECT "+notificationManager.ownerIDField+" FROM "+notificationManager.getTableName()+
+ " WHERE "+query,list,new StringSet(getJobsKey()),null);
+ return set.getRowCount() > 0;
}
/** See if there's a reference to an output connection name.
@@ -3575,6 +3602,7 @@ public class Jobs extends org.apache.man
// Fill in schedules for jobs
pipelineManager.getRows(returnValues,idList,params);
+ notificationManager.getRows(returnValues,idList,params);
scheduleManager.getRows(returnValues,idList,params);
hopFilterManager.getRows(returnValues,idList,params);
}