You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2010/08/31 21:12:15 UTC
svn commit: r991295 - in /incubator/lcf/trunk/modules:
connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/
framework/agents/src/main/java/org/apache/acf/agents/interfaces/
framework/agents/src/main/java/org/apache/acf/agents/outp...
Author: kwright
Date: Tue Aug 31 19:12:14 2010
New Revision: 991295
URL: http://svn.apache.org/viewvc?rev=991295&view=rev
Log:
Add support for job end notification to output connectors, with guaranteed retry until the notification succeeds. Fix for CONNECTORS-41.
Added:
incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java (with props)
Modified:
incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java
incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java
incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java
incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java
incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp
incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java
incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java
incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java
incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java
incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java
incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java
incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java
incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java
Modified: incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java (original)
+++ incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java Tue Aug 31 19:12:14 2010
@@ -176,6 +176,77 @@ public class HttpPoster
interruptionRetryTime = new Long(x).longValue();
}
+ /** Cause a commit to happen.
+ */
+ public void commitPost()
+ throws ACFException, ServiceInterruption
+ {
+ if (Logging.ingest.isDebugEnabled())
+ Logging.ingest.debug("commitPost()");
+
+ int ioErrorRetry = 5;
+ while (true)
+ {
+ // Open a socket to ingest, and to the response stream to get the post result
+ try
+ {
+ CommitThread t = new CommitThread();
+ try
+ {
+ t.start();
+ t.join();
+
+ Throwable thr = t.getException();
+ if (thr != null)
+ {
+ if (thr instanceof ServiceInterruption)
+ throw (ServiceInterruption)thr;
+ if (thr instanceof ACFException)
+ throw (ACFException)thr;
+ if (thr instanceof IOException)
+ throw (IOException)thr;
+ if (thr instanceof RuntimeException)
+ throw (RuntimeException)thr;
+ else
+ throw (Error)thr;
+ }
+ return;
+ }
+ catch (InterruptedException e)
+ {
+ t.interrupt();
+ throw new ACFException("Interrupted: "+e.getMessage(),ACFException.INTERRUPTED);
+ }
+ }
+ catch (IOException ioe)
+ {
+ if (ioErrorRetry == 0)
+ {
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("IO exception committing: "+ioe.getMessage(),
+ ioe,
+ currentTime + interruptionRetryTime,
+ currentTime + 2L * 60L * 60000L,
+ -1,
+ true);
+ }
+ }
+
+ // Go back around again!
+ // Sleep for a time, and retry
+ try
+ {
+ ACF.sleep(10000L);
+ }
+ catch (InterruptedException e)
+ {
+ throw new ACFException("Interrupted",ACFException.INTERRUPTED);
+ }
+ ioErrorRetry--;
+
+ }
+
+ }
/**
* Post the input stream to ingest
@@ -1524,6 +1595,130 @@ public class HttpPoster
return activityDetails;
}
}
+
+ /** Killable thread that does a commit.
+ * Java 1.5 stopped permitting thread interruptions to abort socket waits. As a result, it is impossible to get threads to shutdown cleanly that are doing
+ * such waits. So, the places where this happens are segregated in their own threads so that they can be just abandoned.
+ *
+ * This thread does a commit.
+ */
+ protected class CommitThread extends java.lang.Thread
+ {
+ protected Throwable exception = null;
+
+ public CommitThread()
+ {
+ super();
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ // Do the operation!
+ // Open a socket to update request handler, and to the response stream to get the post result
+ try
+ {
+ // Set up the socket, and the (optional) secure socket.
+ Socket socket = createSocket(responseRetries);
+ try
+ {
+ InputStream in = socket.getInputStream();
+ try
+ {
+ OutputStream out = socket.getOutputStream();
+ try
+ {
+ // Create the output stream to GTS
+ byte[] tmp = ("GET " + postUpdateAction + "?commit=true HTTP/1.0\r\n").getBytes("ASCII");
+ out.write(tmp, 0, tmp.length);
+
+ writeCredentials(out);
+
+ tmp = ("Content-Length: 0\r\n\r\n").getBytes("ASCII");
+ out.write(tmp, 0, tmp.length);
+
+ if (Logging.ingest.isDebugEnabled())
+ Logging.ingest.debug("Commit request posted");
+
+ out.flush();
+
+ CodeDetails cd = getResponse(in);
+
+ int codeValue = cd.getCodeValue();
+ if (codeValue < 0)
+ throw new ACFException("Http protocol error");
+
+ // 200 means everything went OK
+ if (codeValue == 200)
+ {
+ cd.parseCommitResponse();
+ return;
+ }
+
+ // We ignore everything in the range from 400-500 now
+ if (codeValue == 401)
+ throw new ACFException("Bad credentials for commit request",ACFException.SETUP_ERROR);
+
+ // Anything else means the info request failed.
+ throw new ACFException("Error connecting to update request API: '"+cd.getDescription()+"'");
+ }
+ finally
+ {
+ out.close();
+ }
+ }
+ finally
+ {
+ in.close();
+ }
+ }
+ finally
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (InterruptedIOException e)
+ {
+ throw e;
+ }
+ catch (IOException e)
+ {
+ Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
+ // Do NOT rethrow
+ }
+ }
+ }
+ catch (UnsupportedEncodingException ioe)
+ {
+ throw new ACFException("Fatal commit error: "+ioe.getMessage(),ioe);
+ }
+ catch (InterruptedIOException ioe)
+ {
+ // Exit the thread.
+ return;
+ }
+ catch (IOException ioe)
+ {
+ // Log the error
+ Logging.ingest.warn("Error communicating with update request handler: "+ioe.getMessage(),ioe);
+ throw ioe;
+ }
+ }
+ catch (Throwable e)
+ {
+ this.exception = e;
+ }
+ }
+
+ public Throwable getException()
+ {
+ return exception;
+ }
+ }
+
/** Killable thread that does a status check.
* Java 1.5 stopped permitting thread interruptions to abort socket waits. As a result, it is impossible to get threads to shutdown cleanly that are doing
@@ -1775,6 +1970,12 @@ public class HttpPoster
parseIngestionResponse();
}
+ public void parseCommitResponse()
+ throws ACFException
+ {
+ parseIngestionResponse();
+ }
+
public void parseStatusResponse()
throws ACFException
{
Modified: incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java (original)
+++ incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java Tue Aug 31 19:12:14 2010
@@ -358,6 +358,20 @@ public class SolrConnector extends org.a
poster.deletePost(documentURI,activities);
}
+ /** Notify the connector of a completed job.
+ * This is meant to allow the connector to flush any internal data structures it has been keeping around, or to tell the output repository that this
+ * is a good time to synchronize things. It is called whenever a job is either completed or aborted.
+ */
+ public void noteJobComplete()
+ throws ACFException, ServiceInterruption
+ {
+ // Establish a session
+ getSession();
+
+ // Do a commit post
+ poster.commitPost();
+ }
+
// UI support methods.
//
// These support methods come in two varieties. The first bunch is involved in setting up connection configuration information. The second bunch
Modified: incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java (original)
+++ incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java Tue Aug 31 19:12:14 2010
@@ -132,6 +132,13 @@ public interface IOutputConnector extend
public void removeDocument(String documentURI, String outputDescription, IOutputRemoveActivity activities)
throws ACFException, ServiceInterruption;
+ /** Notify the connector of a completed job.
+ * This is meant to allow the connector to flush any internal data structures it has been keeping around, or to tell the output repository that this
+ * is a good time to synchronize things. It is called whenever a job is either completed or aborted.
+ */
+ public void noteJobComplete()
+ throws ACFException, ServiceInterruption;
+
// UI support methods.
//
// These support methods come in two varieties. The first bunch (inherited from IConnector) is involved in setting up connection configuration information.
Modified: incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java (original)
+++ incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java Tue Aug 31 19:12:14 2010
@@ -69,6 +69,16 @@ public abstract class BaseOutputConnecto
throw new ACFException("Unrecognized output connector command '"+command+"'");
}
+ /** Notify the connector of a completed job.
+ * This is meant to allow the connector to flush any internal data structures it has been keeping around, or to tell the output repository that this
+ * is a good time to synchronize things. It is called whenever a job is either completed or aborted.
+ */
+ public void noteJobComplete()
+ throws ACFException, ServiceInterruption
+ {
+ // The base implementation does nothing here.
+ }
+
/** Detect if a mime type is indexable or not. This method is used by participating repository connectors to pre-filter the number of
* unusable documents that will be passed to this output connector.
*@param mimeType is the mime type of the document.
Modified: incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp (original)
+++ incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp Tue Aug 31 19:12:14 2010
@@ -149,6 +149,9 @@ if (maintenanceUnderway == false)
case JobStatus.JOBSTATUS_JOBENDCLEANUP:
statusName = "Terminating";
break;
+ case JobStatus.JOBSTATUS_JOBENDNOTIFICATION:
+ statusName = "End notification";
+ break;
case JobStatus.JOBSTATUS_ERROR:
statusName = "Error: "+js.getErrorText();
break;
Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java Tue Aug 31 19:12:14 2010
@@ -106,6 +106,8 @@ public class ListJobStatuses
return "running no connector";
case JobStatus.JOBSTATUS_JOBENDCLEANUP:
return "terminating";
+ case JobStatus.JOBSTATUS_JOBENDNOTIFICATION:
+ return "notifying";
default:
return "unknown";
}
Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java Tue Aug 31 19:12:14 2010
@@ -621,7 +621,18 @@ public interface IJobManager
public JobStartRecord[] getJobsReadyForStartup()
throws ACFException;
+ /** Find the list of jobs that need to have their connectors notified of job completion.
+ *@return the ID's of jobs that need their output connectors notified in order to become inactive.
+ */
+ public Long[] getJobsReadyForInactivity()
+ throws ACFException;
+ /** Inactivate a job, from the notification state.
+ *@param jobID is the ID of the job to inactivate.
+ */
+ public void inactivateJob(Long jobID)
+ throws ACFException;
+
/** Reset a starting job back to "ready for startup" state.
*@param jobID is the job id.
*/
@@ -740,16 +751,16 @@ public interface IJobManager
throws ACFException;
/** Put all eligible jobs in the "shutting down" state.
- *@param finishList is filled in with the set of IJobDescription objects that were completed.
*/
- public void finishJobs(ArrayList finishList)
+ public void finishJobs()
throws ACFException;
/** Reset eligible jobs back to "inactive" state. This method is used to pick up all jobs in the shutting down state
* whose purgatory records have been all cleaned up.
*@param currentTime is the current time in milliseconds since epoch.
+ *@param resetJobs is filled in with the set of IJobDescription objects that were reset.
*/
- public void resetJobs(long currentTime)
+ public void resetJobs(long currentTime, ArrayList resetJobs)
throws ACFException;
Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java Tue Aug 31 19:12:14 2010
@@ -38,6 +38,7 @@ public class JobStatus
public final static int JOBSTATUS_RESTARTING = 9;
public final static int JOBSTATUS_RUNNING_UNINSTALLED = 10;
public final static int JOBSTATUS_JOBENDCLEANUP = 11;
+ public final static int JOBSTATUS_JOBENDNOTIFICATION = 12;
// Member variables.
Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java Tue Aug 31 19:12:14 2010
@@ -4807,6 +4807,65 @@ public class JobManager implements IJobM
}
}
+ /** Inactivate a job, from the notification state.
+ *@param jobID is the ID of the job to inactivate.
+ */
+ public void inactivateJob(Long jobID)
+ throws ACFException
+ {
+ // While there is no flow that can cause a job to be in the wrong state when this gets called, as a precaution
+ // it might be a good idea to put this in a transaction and have the state get checked first.
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
+ {
+ // Check job status
+ ArrayList list = new ArrayList();
+ list.add(jobID);
+ IResultSet set = database.performQuery("SELECT "+jobs.statusField+" FROM "+jobs.getTableName()+
+ " WHERE "+jobs.idField+"=? FOR UPDATE",list,null,null);
+ if (set.getRowCount() == 0)
+ throw new ACFException("No such job: "+jobID);
+ IResultRow row = set.getRow(0);
+ int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+
+ switch (status)
+ {
+ case Jobs.STATUS_NOTIFYINGOFCOMPLETION:
+ jobs.notificationComplete(jobID);
+ break;
+ default:
+ throw new ACFException("Unexpected job status: "+Integer.toString(status));
+ }
+ return;
+ }
+ catch (ACFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted clearing notification state for job: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+
/** Reset a starting job back to "ready for startup" state.
*@param jobID is the job id.
*/
@@ -5149,9 +5208,8 @@ public class JobManager implements IJobM
}
/** Put all eligible jobs in the "shutting down" state.
- *@param finishList is filled in with the set of IJobDescription objects that were completed.
*/
- public void finishJobs(ArrayList finishList)
+ public void finishJobs()
throws ACFException
{
while (true)
@@ -5216,10 +5274,7 @@ public class JobManager implements IJobM
if (confirmSet.getRowCount() > 0)
continue;
- IJobDescription jobDescription = jobs.load(jobID,true);
-
// Mark status of job as "finishing"
- finishList.add(jobDescription);
jobs.writeStatus(jobID,jobs.STATUS_SHUTTINGDOWN);
if (Logging.jobs.isDebugEnabled())
{
@@ -5254,6 +5309,32 @@ public class JobManager implements IJobM
}
}
+ /** Find the list of jobs that need to have their connectors notified of job completion.
+ *@return the ID's of jobs that need their output connectors notified in order to become inactive.
+ */
+ public Long[] getJobsReadyForInactivity()
+ throws ACFException
+ {
+ // Do the query
+ IResultSet set = database.performQuery("SELECT "+jobs.idField+" FROM "+
+ jobs.getTableName()+" WHERE "+jobs.statusField+"="+
+ database.quoteSQLString(jobs.statusToString(jobs.STATUS_NOTIFYINGOFCOMPLETION)),null,null,null);
+ // Return them all
+ Long[] rval = new Long[set.getRowCount()];
+ int i = 0;
+ while (i < rval.length)
+ {
+ IResultRow row = set.getRow(i);
+ Long jobID = (Long)row.getValue(jobs.idField);
+ rval[i++] = jobID;
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Found job "+jobID+" in need of notification");
+ }
+ }
+ return rval;
+ }
+
/** Complete the sequence that aborts jobs and makes them runnable again.
*@param timestamp is the current time.
*@param abortJobs is the set of IJobDescription objects that were aborted (and stopped).
@@ -5359,8 +5440,9 @@ public class JobManager implements IJobM
/** Reset eligible jobs back to "inactive" state. This method is used to pick up all jobs in the shutting down state
* whose purgatory or being-deleted records have been all cleaned up.
*@param currentTime is the current time in milliseconds since epoch.
+ *@param resetJobs is filled in with the set of IJobDescription objects that were reset.
*/
- public void resetJobs(long currentTime)
+ public void resetJobs(long currentTime, ArrayList resetJobs)
throws ACFException
{
while (true)
@@ -5403,7 +5485,9 @@ public class JobManager implements IJobM
if (confirmSet.getRowCount() > 0)
continue;
-
+ IJobDescription jobDesc = jobs.load(jobID,true);
+ resetJobs.add(jobDesc);
+
// Label the job "finished"
jobs.finishJob(jobID,currentTime);
if (Logging.jobs.isDebugEnabled())
@@ -5630,6 +5714,9 @@ public class JobManager implements IJobM
case Jobs.STATUS_SHUTTINGDOWN:
rstatus = JobStatus.JOBSTATUS_JOBENDCLEANUP;
break;
+ case Jobs.STATUS_NOTIFYINGOFCOMPLETION:
+ rstatus = JobStatus.JOBSTATUS_JOBENDNOTIFICATION;
+ break;
case Jobs.STATUS_ABORTING:
case Jobs.STATUS_ABORTINGSEEDING:
case Jobs.STATUS_ABORTINGSTARTINGUP:
Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java Tue Aug 31 19:12:14 2010
@@ -33,24 +33,25 @@ public class Jobs extends org.apache.acf
// Status field values
public static final int STATUS_INACTIVE = 0; // Not running
public static final int STATUS_ACTIVE = 1; // Active, within a valid window
- public static final int STATUS_PAUSED = 2; // Paused, but within a valid window
- public static final int STATUS_SHUTTINGDOWN = 3; // Done, except for process cleanup
- public static final int STATUS_ACTIVEWAIT = 4; // Active, but paused due to window expiration
- public static final int STATUS_PAUSEDWAIT = 5; // Paused, and outside of window expiration
- public static final int STATUS_ABORTING = 6; // Aborting (not yet aborted because documents still being processed)
- public static final int STATUS_STARTINGUP = 7; // Loading the queue (will go into ACTIVE if successful, or INACTIVE if not)
- public static final int STATUS_ABORTINGSTARTINGUP = 8; // Will abort once the queue loading is complete
+ public static final int STATUS_PAUSED = 2; // Paused, but within a valid window
+ public static final int STATUS_SHUTTINGDOWN = 3; // Done, except for process cleanup
+ public static final int STATUS_ACTIVEWAIT = 4; // Active, but paused due to window expiration
+ public static final int STATUS_PAUSEDWAIT = 5; // Paused, and outside of window expiration
+ public static final int STATUS_ABORTING = 6; // Aborting (not yet aborted because documents still being processed)
+ public static final int STATUS_STARTINGUP = 7; // Loading the queue (will go into ACTIVE if successful, or INACTIVE if not)
+ public static final int STATUS_ABORTINGSTARTINGUP = 8; // Will abort once the queue loading is complete
public static final int STATUS_READYFORSTARTUP = 9; // Job is marked for startup; startup thread has not taken it yet.
public static final int STATUS_READYFORDELETE = 10; // Job is marked for delete; delete thread has not taken it yet.
public static final int STATUS_ACTIVESEEDING = 11; // Same as active, but seeding process is currently active also.
public static final int STATUS_ABORTINGSEEDING = 12; // Same as aborting, but seeding process is currently active also.
public static final int STATUS_PAUSEDSEEDING = 13; // Same as paused, but seeding process is currently active also.
- public static final int STATUS_ACTIVEWAITSEEDING = 14; // Same as active wait, but seeding process is currently active also.
- public static final int STATUS_PAUSEDWAITSEEDING = 15; // Same as paused wait, but seeding process is currently active also.
- public static final int STATUS_ABORTINGFORRESTART = 16; // Same as aborting, except after abort is complete startup will happen.
+ public static final int STATUS_ACTIVEWAITSEEDING = 14; // Same as active wait, but seeding process is currently active also.
+ public static final int STATUS_PAUSEDWAITSEEDING = 15; // Same as paused wait, but seeding process is currently active also.
+ public static final int STATUS_ABORTINGFORRESTART = 16; // Same as aborting, except after abort is complete startup will happen.
public static final int STATUS_ABORTINGFORRESTARTSEEDING = 17; // Seeding version of aborting for restart
public static final int STATUS_ABORTINGSTARTINGUPFORRESTART = 18; // Starting up version of aborting for restart
-
+ public static final int STATUS_NOTIFYINGOFCOMPLETION = 19; // Notifying connector of terminating job (either aborted, or finished)
+
// These statuses have to do with whether a job has an installed underlying connector or not.
// There are two reasons to have a special state here: (1) if the behavior of the crawler differs, or (2) if the
// UI would present something different.
@@ -135,6 +136,7 @@ public class Jobs extends org.apache.acf
statusMap.put("A",new Integer(STATUS_ACTIVE));
statusMap.put("P",new Integer(STATUS_PAUSED));
statusMap.put("S",new Integer(STATUS_SHUTTINGDOWN));
+ statusMap.put("s",new Integer(STATUS_NOTIFYINGOFCOMPLETION));
statusMap.put("W",new Integer(STATUS_ACTIVEWAIT));
statusMap.put("Z",new Integer(STATUS_PAUSEDWAIT));
statusMap.put("X",new Integer(STATUS_ABORTING));
@@ -1635,7 +1637,7 @@ public class Jobs extends org.apache.acf
ArrayList list = new ArrayList();
list.add(jobID);
HashMap map = new HashMap();
- map.put(statusField,statusToString(STATUS_INACTIVE));
+ map.put(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION));
map.put(errorField,null);
map.put(endTimeField,new Long(finishTime));
map.put(lastTimeField,new Long(finishTime));
@@ -1654,7 +1656,7 @@ public class Jobs extends org.apache.acf
ArrayList list = new ArrayList();
list.add(jobID);
HashMap map = new HashMap();
- map.put(statusField,statusToString(STATUS_INACTIVE));
+ map.put(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION));
map.put(endTimeField,null);
map.put(lastTimeField,new Long(abortTime));
map.put(windowEndField,null);
@@ -1663,6 +1665,20 @@ public class Jobs extends org.apache.acf
performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
}
+ /** Mark job as having properly notified the output connector of completion.
+ *@param jobID is the job id.
+ */
+ public void notificationComplete(Long jobID)
+ throws ACFException
+ {
+ ArrayList list = new ArrayList();
+ list.add(jobID);
+ HashMap map = new HashMap();
+ map.put(statusField,statusToString(STATUS_INACTIVE));
+ // Leave everything else around from the abort/finish.
+ performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
+ }
+
/** See if there's a reference to a connection name.
*@param connectionName is the name of the connection.
*@return true if there is a reference, false otherwise.
@@ -1807,6 +1823,8 @@ public class Jobs extends org.apache.acf
return "P";
case STATUS_SHUTTINGDOWN:
return "S";
+ case STATUS_NOTIFYINGOFCOMPLETION:
+ return "s";
case STATUS_ACTIVEWAIT:
return "W";
case STATUS_PAUSEDWAIT:
Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java Tue Aug 31 19:12:14 2010
@@ -38,6 +38,7 @@ public class ACF extends org.apache.acf.
protected static JobStartThread jobStartThread = null;
protected static StufferThread stufferThread = null;
protected static FinisherThread finisherThread = null;
+ protected static JobNotificationThread notificationThread = null;
protected static StartupThread startupThread = null;
protected static JobDeleteThread jobDeleteThread = null;
protected static WorkerThread[] workerThreads = null;
@@ -189,6 +190,7 @@ public class ACF extends org.apache.acf.
jobStartThread = new JobStartThread();
startupThread = new StartupThread(queueTracker);
finisherThread = new FinisherThread();
+ notificationThread = new JobNotificationThread();
jobDeleteThread = new JobDeleteThread();
stufferThread = new StufferThread(documentQueue,numWorkerThreads,workerResetManager,queueTracker,blockingDocuments,lowWaterFactor,stuffAmtFactor);
expireStufferThread = new ExpireStufferThread(expireQueue,numExpireThreads,workerResetManager);
@@ -289,6 +291,7 @@ public class ACF extends org.apache.acf.
jobStartThread.start();
startupThread.start();
finisherThread.start();
+ notificationThread.start();
jobDeleteThread.start();
stufferThread.start();
expireStufferThread.start();
@@ -346,7 +349,7 @@ public class ACF extends org.apache.acf.
synchronized (startupLock)
{
while (initializationThread != null || jobDeleteThread != null || startupThread != null || jobStartThread != null || stufferThread != null ||
- finisherThread != null || workerThreads != null || expireStufferThread != null | expireThreads != null ||
+ finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null | expireThreads != null ||
deleteStufferThread != null || deleteThreads != null ||
jobResetThread != null || seedingThread != null || idleCleanupThread != null || setPriorityThread != null)
{
@@ -385,6 +388,10 @@ public class ACF extends org.apache.acf.
{
finisherThread.interrupt();
}
+ if (notificationThread != null)
+ {
+ notificationThread.interrupt();
+ }
if (workerThreads != null)
{
int i = 0;
@@ -482,6 +489,11 @@ public class ACF extends org.apache.acf.
if (!finisherThread.isAlive())
finisherThread = null;
}
+ if (notificationThread != null)
+ {
+ if (!notificationThread.isAlive())
+ notificationThread = null;
+ }
if (workerThreads != null)
{
int i = 0;
@@ -2514,6 +2526,8 @@ public class ACF extends org.apache.acf.
return "running no connector";
case JobStatus.JOBSTATUS_JOBENDCLEANUP:
return "terminating";
+ case JobStatus.JOBSTATUS_JOBENDNOTIFICATION:
+ return "notifying";
default:
return "unknown";
}
Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java Tue Aug 31 19:12:14 2010
@@ -61,16 +61,7 @@ public class FinisherThread extends Thre
{
Logging.threads.debug("Cleaning up completed jobs...");
// See if there are any completed jobs
- ArrayList doneJobs = new ArrayList();
- jobManager.finishJobs(doneJobs);
- int k = 0;
- while (k < doneJobs.size())
- {
- IJobDescription desc = (IJobDescription)doneJobs.get(k++);
- connectionManager.recordHistory(desc.getConnectionName(),
- null,connectionManager.ACTIVITY_JOBEND,null,
- desc.getID().toString()+"("+desc.getDescription()+")",null,null,null);
- }
+ jobManager.finishJobs();
Logging.threads.debug("Done cleaning up completed jobs");
ACF.sleep(10000L);
}
Added: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java?rev=991295&view=auto
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java (added)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java Tue Aug 31 19:12:14 2010
@@ -0,0 +1,172 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.acf.crawler.system;
+
+import org.apache.acf.core.interfaces.*;
+import org.apache.acf.agents.interfaces.*;
+import org.apache.acf.crawler.interfaces.*;
+import org.apache.acf.crawler.system.Logging;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This class represents the thread that notices jobs that have completed their "notify connector" phase, and resets them back to
+* inactive.
+*/
+public class JobNotificationThread extends Thread
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Constructor.
+ */
+ public JobNotificationThread()
+ throws ACFException
+ {
+ super();
+ setName("Job notification thread");
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ // Create a thread context object.
+ IThreadContext threadContext = ThreadContextFactory.make();
+ IJobManager jobManager = JobManagerFactory.make(threadContext);
+ IOutputConnectionManager connectionManager = OutputConnectionManagerFactory.make(threadContext);
+
+ // Loop
+ while (true)
+ {
+ // Do another try/catch around everything in the loop
+ try
+ {
+ Long[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity();
+
+ int k = 0;
+ while (k < jobsNeedingNotification.length)
+ {
+ Long jobID = jobsNeedingNotification[k++];
+ IJobDescription job = jobManager.load(jobID,true);
+ if (job != null)
+ {
+ // Get the connection name
+ String connectionName = job.getOutputConnectionName();
+ IOutputConnection connection = connectionManager.load(connectionName);
+ if (connection != null)
+ {
+ // Grab an appropriate connection instance
+ IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+ if (connector != null)
+ {
+ try
+ {
+ // Do the notification itself
+ try
+ {
+ connector.noteJobComplete();
+ }
+ catch (ServiceInterruption e)
+ {
+ Logging.threads.warn("Service interruption notifying connection - retrying: "+e.getMessage(),e);
+ continue;
+ }
+ catch (ACFException e)
+ {
+ if (e.getErrorCode() == ACFException.INTERRUPTED)
+ throw e;
+ if (e.getErrorCode() == ACFException.DATABASE_CONNECTION_ERROR)
+ throw e;
+ if (e.getErrorCode() == ACFException.SETUP_ERROR)
+ throw e;
+ // Nothing special; report the error and keep going.
+ Logging.threads.error(e.getMessage(),e);
+ continue;
+ }
+ // When done, put the job into the Inactive state.
+ jobManager.inactivateJob(jobID);
+ }
+ finally
+ {
+ OutputConnectorFactory.release(connector);
+ }
+ }
+ }
+ }
+ }
+ ACF.sleep(10000L);
+ }
+ catch (ACFException e)
+ {
+ if (e.getErrorCode() == ACFException.INTERRUPTED)
+ break;
+
+ if (e.getErrorCode() == ACFException.DATABASE_CONNECTION_ERROR)
+ {
+ Logging.threads.error("Job notification thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+ try
+ {
+ // Give the database a chance to catch up/wake up
+ ACF.sleep(10000L);
+ }
+ catch (InterruptedException se)
+ {
+ break;
+ }
+ continue;
+ }
+
+ // Log it, but keep the thread alive
+ Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+
+ if (e.getErrorCode() == ACFException.SETUP_ERROR)
+ {
+ // Shut the whole system down!
+ System.exit(1);
+ }
+
+ }
+ catch (InterruptedException e)
+ {
+ // We're supposed to quit
+ break;
+ }
+ catch (OutOfMemoryError e)
+ {
+ System.err.println("agents process ran out of memory - shutting down");
+ e.printStackTrace(System.err);
+ System.exit(-200);
+ }
+ catch (Throwable e)
+ {
+ // A more severe error - but stay alive
+ Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ // Severe error on initialization
+ System.err.println("agents process could not start - shutting down");
+ Logging.threads.fatal("JobNotificationThread initialization error tossed: "+e.getMessage(),e);
+ System.exit(-300);
+ }
+ }
+
+}
Propchange: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
------------------------------------------------------------------------------
svn:keywords = Id
Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java Tue Aug 31 19:12:14 2010
@@ -25,8 +25,8 @@ import org.apache.acf.crawler.system.Log
import java.util.*;
import java.lang.reflect.*;
-/** This class represents the thread that notices jobs that have completed their shutdown phase, and resets them back to
-* inactive.
+/** This class represents the thread that notices jobs that have completed their shutdown phase, and puts them in the
+* "notify connector" state.
*/
public class JobResetThread extends Thread
{
@@ -73,8 +73,17 @@ public class JobResetThread extends Thre
null,connectionManager.ACTIVITY_JOBABORT,null,
desc.getID().toString()+"("+desc.getDescription()+")",null,null,null);
}
- jobManager.resetJobs(currentTime);
-
+ ArrayList jobCompletions = new ArrayList();
+ jobManager.resetJobs(currentTime,jobCompletions);
+ k = 0;
+ while (k < jobCompletions.size())
+ {
+ IJobDescription desc = (IJobDescription)jobCompletions.get(k++);
+ connectionManager.recordHistory(desc.getConnectionName(),
+ null,connectionManager.ACTIVITY_JOBEND,null,
+ desc.getID().toString()+"("+desc.getDescription()+")",null,null,null);
+ }
+
// If there were any job aborts, we must reprioritize all active documents, since we've done something
// not predicted by the algorithm that assigned those priorities. This is, of course, quite expensive,
// but it cannot be helped (at least, I cannot find a way to avoid it).